使用三个输出通道,0显示,1推流,2AI ,开启编码后,报错get display buffer failed

Viewed 38

重现步骤
推流是自己封装的,单独使用没问题
报下面的错误

image.png
去掉推流 出现同样的错误

#        _thread.start_new_thread(rtsppusher._do_rtsp_stream, ())

image.png

去掉 Display.show_image()或encoder.Start(VENC_CHN_ID_0)可以运行

期待结果和实际结果

软硬件版本信息

错误日志

尝试解决过程

补充材料

import ujson
import network
import os
from time import *
from media.vencoder import *
from media.sensor import *
from media.media import *
from media.display import *
import multimedia as mm
import nncase_runtime as nn
import gc
import _thread
import time
#from libs.PipeLineM import PipeLine
from libs.YOLO import YOLO11
from libs.Utils import *
import ulab.numpy as np


# 全局变量
sensor = None
encoder = None

# 解析配置文件
def read_deploy_config(config_path):
    # 打开JSON文件以进行读取deploy_config
    with open(config_path, 'r') as json_file:
        try:
            # 从文件中加载JSON数据
            config = ujson.load(json_file)
        except ValueError as e:
            print("JSON 解析错误:", e)
    return config

# 连接WiFi
def wifi_connect(ssid, key):
    sta=network.WLAN(0)
    while(sta.active() == False):
        sta.active(1)
    print("wifi激活......")
    max_retries = 10
    retry_count = 0
    while not sta.status() and retry_count < max_retries:
        sta.connect("*****", "*******")
        retry_count += 1

    sleep(3)
    if sta.status():
        print("wifi连接成功......",sta.ifconfig())
    else:
        print("连接失败,达到最大重试次数")

def _get_timestamp_ticket():
    ns = time.time_ns()  # 纳秒
    return ns // 1000     # 转为微秒

class RtspPusher:
    def __init__(self, sRtspUrl,video_width=1280,video_height=720,video_type=mm.multi_media_type.media_h264, enable_audio=False):
        self.sRtspUrl = sRtspUrl
        self.video_width = video_width
        self.video_height = video_height
        self.video_type = video_type        # 视频编码类型( H264/H265 )
        self.enable_audio = enable_audio    # 是否启用音频
        self.rtsppusher = mm.rtsp_pusher()
        self.venc_chn = VENC_CHN_ID_0       # 视频编码通道
        self.start_stream = False            # 是否启动推流线程
        self.runthread_over = False          # 推流线程是否结束
        self.timestamp_video_ = 0

    def start(self):
        self.rtsppusher.rtsppusher_init(self.video_width,self.video_height,self.sRtspUrl)
        print("rtsp初始化成功")
        self.rtsppusher.rtsppusher_open()
        print("rtsp连接成功")
        self.start_stream = True


    def stop(self):
        if not self.start_stream:
            return
        # 等待推流线程退出
        print("等待推流线程退出")
        self.start_stream = False
        while not self.runthread_over:
            sleep(0.1)
        self.runthread_over = False
        self.rtsppusher.rtsppusher_close()
        print("rtsp关闭连接")
        self.rtsppusher.rtsppusher_deinit()
        print("rtsp销毁完成")

    def _do_rtsp_stream(self):
        try:
            streamData = StreamData()
            while self.start_stream:
                os.exitpoint()
                # 获取一帧码流
                encoder.GetStream(VENC_CHN_ID_0, streamData)
                # 推流
                for pack_idx in range(0, streamData.pack_cnt):
                    stream_data = bytes(uctypes.bytearray_at(streamData.data[pack_idx], streamData.data_size[pack_idx]))
                    key_frame = streamData.stream_type[pack_idx]
                    if 0 == self.timestamp_video_:
                        self.timestamp_video_ = _get_timestamp_ticket()

                    if encoder.STREAM_TYPE_I == key_frame:
                        nalu_type = stream_data[4]
                        if (nalu_type & 0x1F) == 5:
                            key_frame = encoder.STREAM_TYPE_I
                        else:
                            key_frame = encoder.STREAM_TYPE_P
                    b_frame_type = 1 if key_frame == encoder.STREAM_TYPE_I else 0
                    self.rtsppusher.rtsppusher_pushvideodata(stream_data, streamData.data_size[pack_idx],b_frame_type,_get_timestamp_ticket() - self.timestamp_video_)
                # 释放一帧码流
                encoder.ReleaseStream(VENC_CHN_ID_0, streamData)
        except BaseException as e:
            print(f"Exception {e}")
        finally:
            self.runthread_over = True
            # 停止 rtsp server
            self.stop()
        self.runthread_over = True


if __name__=="__main__":
    #配置
    config_path = "/data/deploy_config.json"
    print("开始执行......")
    # 读取配置
    deploy_conf=read_deploy_config(config_path)
    WIFI_NAME = deploy_conf["wifi_name"]
    WIFI_PASSWORD = deploy_conf["wifi_password"]
    DISPLAY_WIDTH = deploy_conf["display_width"]
    DISPLAY_HEIGHT = deploy_conf["display_height"]
    RTSP_WIDTH = deploy_conf["rtsp_width"]
    RTSP_HEIGHT = deploy_conf["rtsp_height"]
    RTSP_PUSH_URL = deploy_conf["rtsp_push_url"]
    # 连接网络
    wifi_connect(WIFI_NAME, WIFI_PASSWORD)

    # 初始化并配置sensor
    sensor = Sensor(id=0,width=1920, height=1080,fps=30)
    sensor.reset()
    # 设置镜像
    sensor.set_hmirror(False)
    # 设置翻转
    sensor.set_vflip(False)


    # 通道0直接给到显示VO,格式为YUV420
    sensor.set_framesize(width=DISPLAY_WIDTH, height=DISPLAY_HEIGHT, chn=CAM_CHN_ID_0)
    sensor.set_pixformat(sensor.YUV420SP)
    sensor_bind_info = sensor.bind_info(x = 0, y = 0, chn = CAM_CHN_ID_0)
    Display.bind_layer(**sensor_bind_info, layer = Display.LAYER_VIDEO1)
    Display.init(Display.VIRT, width=1920, height=1080, to_ide=True)


    # 通道2给到AI做算法处理,格式为RGB888
    sensor.set_framesize(width=DISPLAY_WIDTH, height=DISPLAY_HEIGHT, chn=CAM_CHN_ID_2)
    sensor.set_pixformat(PIXEL_FORMAT_RGB_888_PLANAR, chn=CAM_CHN_ID_2)

    # 通道1直接给到推流,格式为YUV420
    sensor.set_framesize(width=RTSP_WIDTH, height=RTSP_HEIGHT, chn=CAM_CHN_ID_1)
    sensor.set_pixformat(sensor.YUV420SP, chn=CAM_CHN_ID_1)

    # 实例化 video encoder
    encoder = Encoder()
    encoder.SetOutBufs(VENC_CHN_ID_0, 8, RTSP_WIDTH, RTSP_HEIGHT)
    # 绑定 camera 和 venc
    venc_link = MediaManager.link(sensor.bind_info(chn=CAM_CHN_ID_1)['src'], (VIDEO_ENCODE_MOD_ID, VENC_DEV_ID, VENC_CHN_ID_0))
    # init media manager
    MediaManager.init()

    # 创建编码器
    chnAttr = ChnAttrStr(encoder.PAYLOAD_TYPE_H264, encoder.H264_PROFILE_MAIN, RTSP_HEIGHT, RTSP_HEIGHT,bit_rate=400)
    encoder.Create(VENC_CHN_ID_0, chnAttr)


    sensor.run()
    encoder.Start(VENC_CHN_ID_0)

#    pl.sensor = sensor



    # RtspPusher
    rtsppusher = RtspPusher(sRtspUrl=RTSP_PUSH_URL,video_width=RTSP_WIDTH,video_height=RTSP_HEIGHT)
    rtsppusher.start()

    cur_frame = None
    kmodel_path="/data/best_20250606.kmodel"
    labels = ["people","未带安全帽","安全帽","反光衣","未穿反光衣","翻越护栏","未翻越护栏"]
    model_input_size=[640,640]
    rgb888p_size=[1920,1080]
    display_size=[1920,1080]
    confidence_threshold = 0.3
    nms_threshold=0.4

    osd_img = image.Image(display_size[0], display_size[1], image.ARGB8888)

    # 初始化YOLO11实例
    yolo=YOLO11(task_type="detect",mode="video",kmodel_path=kmodel_path,labels=labels,rgb888p_size=rgb888p_size,model_input_size=model_input_size,display_size=display_size,conf_thresh=confidence_threshold,nms_thresh=nms_threshold,max_boxes_num=50,debug_mode=0)
    yolo.colors = [(125,0,0,255), (255,255,0,0), (125,0,255,0),(125,0,255,0),(255,255,0,0),(255,255,0,0), (125,0,255,0)] #蓝红绿绿红红绿
    yolo.config_preprocess()

    osd_img = image.Image(display_size[0], display_size[1], image.ARGB8888)

    try:
        _thread.start_new_thread(rtsppusher._do_rtsp_stream, ())
        while True:
            os.exitpoint()
            cur_frame = sensor.snapshot(chn=CAM_CHN_ID_2)
            img=cur_frame.to_numpy_ref()
            res=yolo.run(img)
            yolo.draw_result(res,osd_img)
            if res:
                osd_img.clear()
                for i in range(len(res[0])):
                    resu = labels[res[1][i]] + " {0:.3f}".format(res[2][i])
                    print(resu)
#                    x, y, w, h = map(lambda x: int(round(x, 0)), res[0][i])
#                    osd_img.draw_rectangle(x,y, w, h, color=yolo.colors[res[1][i]],thickness=4)
#                    osd_img.draw_string_advanced( x , y-50,32," " + labels[res[1][i]] + " {0:.3f}".format(res[2][i]) , color=yolo.colors[res[1][i]])
            Display.show_image(osd_img, 0, 0, Display.LAYER_OSD3)
            gc.collect()
            sleep(0.5)

    except KeyboardInterrupt:
#        print(f"Exception {e}")
        print("中断推流")
    finally:
        rtsppusher.stop()
        yolo.deinit()
        sensor.stop()
        del venc_link
        Display.deinit()
        encoder.Stop(VENC_CHN_ID_0)
        encoder.Destroy(VENC_CHN_ID_0)
        time.sleep_ms(50)
        MediaManager.deinit()
        gc.collect()
    print("程序结束")

1 Answers

你好,请提供一个极简可复现问题的 Demo:剔除推流相关逻辑,仅保留直接触发问题的必要功能

https://www.kendryte.com/answer/questions/10010000000005132
极简Demo未出现原问题,出现的这个sensor(0) snapshot chn(2) failed(3)现在原先中出现过,可以先看下这个