重现步骤
推流是自己封装的,单独使用没问题
报下面的错误
去掉推流 出现同样的错误
# _thread.start_new_thread(rtsppusher._do_rtsp_stream, ())
去掉 Display.show_image()或encoder.Start(VENC_CHN_ID_0)可以运行
期待结果和实际结果
软硬件版本信息
错误日志
尝试解决过程
补充材料
import ujson
import network
import os
from time import *
from media.vencoder import *
from media.sensor import *
from media.media import *
from media.display import *
import multimedia as mm
import nncase_runtime as nn
import gc
import _thread
import time
#from libs.PipeLineM import PipeLine
from libs.YOLO import YOLO11
from libs.Utils import *
import ulab.numpy as np
# 全局变量
sensor = None
encoder = None
# 解析配置文件
def read_deploy_config(config_path):
# 打开JSON文件以进行读取deploy_config
with open(config_path, 'r') as json_file:
try:
# 从文件中加载JSON数据
config = ujson.load(json_file)
except ValueError as e:
print("JSON 解析错误:", e)
return config
# 连接WiFi
def wifi_connect(ssid, key):
sta=network.WLAN(0)
while(sta.active() == False):
sta.active(1)
print("wifi激活......")
max_retries = 10
retry_count = 0
while not sta.status() and retry_count < max_retries:
sta.connect("*****", "*******")
retry_count += 1
sleep(3)
if sta.status():
print("wifi连接成功......",sta.ifconfig())
else:
print("连接失败,达到最大重试次数")
def _get_timestamp_ticket():
ns = time.time_ns() # 纳秒
return ns // 1000 # 转为微秒
class RtspPusher:
def __init__(self, sRtspUrl,video_width=1280,video_height=720,video_type=mm.multi_media_type.media_h264, enable_audio=False):
self.sRtspUrl = sRtspUrl
self.video_width = video_width
self.video_height = video_height
self.video_type = video_type # 视频编码类型( H264/H265 )
self.enable_audio = enable_audio # 是否启用音频
self.rtsppusher = mm.rtsp_pusher()
self.venc_chn = VENC_CHN_ID_0 # 视频编码通道
self.start_stream = False # 是否启动推流线程
self.runthread_over = False # 推流线程是否结束
self.timestamp_video_ = 0
def start(self):
self.rtsppusher.rtsppusher_init(self.video_width,self.video_height,self.sRtspUrl)
print("rtsp初始化成功")
self.rtsppusher.rtsppusher_open()
print("rtsp连接成功")
self.start_stream = True
def stop(self):
if not self.start_stream:
return
# 等待推流线程退出
print("等待推流线程退出")
self.start_stream = False
while not self.runthread_over:
sleep(0.1)
self.runthread_over = False
self.rtsppusher.rtsppusher_close()
print("rtsp关闭连接")
self.rtsppusher.rtsppusher_deinit()
print("rtsp销毁完成")
def _do_rtsp_stream(self):
try:
streamData = StreamData()
while self.start_stream:
os.exitpoint()
# 获取一帧码流
encoder.GetStream(VENC_CHN_ID_0, streamData)
# 推流
for pack_idx in range(0, streamData.pack_cnt):
stream_data = bytes(uctypes.bytearray_at(streamData.data[pack_idx], streamData.data_size[pack_idx]))
key_frame = streamData.stream_type[pack_idx]
if 0 == self.timestamp_video_:
self.timestamp_video_ = _get_timestamp_ticket()
if encoder.STREAM_TYPE_I == key_frame:
nalu_type = stream_data[4]
if (nalu_type & 0x1F) == 5:
key_frame = encoder.STREAM_TYPE_I
else:
key_frame = encoder.STREAM_TYPE_P
b_frame_type = 1 if key_frame == encoder.STREAM_TYPE_I else 0
self.rtsppusher.rtsppusher_pushvideodata(stream_data, streamData.data_size[pack_idx],b_frame_type,_get_timestamp_ticket() - self.timestamp_video_)
# 释放一帧码流
encoder.ReleaseStream(VENC_CHN_ID_0, streamData)
except BaseException as e:
print(f"Exception {e}")
finally:
self.runthread_over = True
# 停止 rtsp server
self.stop()
self.runthread_over = True
if __name__=="__main__":
#配置
config_path = "/data/deploy_config.json"
print("开始执行......")
# 读取配置
deploy_conf=read_deploy_config(config_path)
WIFI_NAME = deploy_conf["wifi_name"]
WIFI_PASSWORD = deploy_conf["wifi_password"]
DISPLAY_WIDTH = deploy_conf["display_width"]
DISPLAY_HEIGHT = deploy_conf["display_height"]
RTSP_WIDTH = deploy_conf["rtsp_width"]
RTSP_HEIGHT = deploy_conf["rtsp_height"]
RTSP_PUSH_URL = deploy_conf["rtsp_push_url"]
# 连接网络
wifi_connect(WIFI_NAME, WIFI_PASSWORD)
# 初始化并配置sensor
sensor = Sensor(id=0,width=1920, height=1080,fps=30)
sensor.reset()
# 设置镜像
sensor.set_hmirror(False)
# 设置翻转
sensor.set_vflip(False)
# 通道0直接给到显示VO,格式为YUV420
sensor.set_framesize(width=DISPLAY_WIDTH, height=DISPLAY_HEIGHT, chn=CAM_CHN_ID_0)
sensor.set_pixformat(sensor.YUV420SP)
sensor_bind_info = sensor.bind_info(x = 0, y = 0, chn = CAM_CHN_ID_0)
Display.bind_layer(**sensor_bind_info, layer = Display.LAYER_VIDEO1)
Display.init(Display.VIRT, width=1920, height=1080, to_ide=True)
# 通道2给到AI做算法处理,格式为RGB888
sensor.set_framesize(width=DISPLAY_WIDTH, height=DISPLAY_HEIGHT, chn=CAM_CHN_ID_2)
sensor.set_pixformat(PIXEL_FORMAT_RGB_888_PLANAR, chn=CAM_CHN_ID_2)
# 通道1直接给到推流,格式为YUV420
sensor.set_framesize(width=RTSP_WIDTH, height=RTSP_HEIGHT, chn=CAM_CHN_ID_1)
sensor.set_pixformat(sensor.YUV420SP, chn=CAM_CHN_ID_1)
# 实例化 video encoder
encoder = Encoder()
encoder.SetOutBufs(VENC_CHN_ID_0, 8, RTSP_WIDTH, RTSP_HEIGHT)
# 绑定 camera 和 venc
venc_link = MediaManager.link(sensor.bind_info(chn=CAM_CHN_ID_1)['src'], (VIDEO_ENCODE_MOD_ID, VENC_DEV_ID, VENC_CHN_ID_0))
# init media manager
MediaManager.init()
# 创建编码器
chnAttr = ChnAttrStr(encoder.PAYLOAD_TYPE_H264, encoder.H264_PROFILE_MAIN, RTSP_HEIGHT, RTSP_HEIGHT,bit_rate=400)
encoder.Create(VENC_CHN_ID_0, chnAttr)
sensor.run()
encoder.Start(VENC_CHN_ID_0)
# pl.sensor = sensor
# RtspPusher
rtsppusher = RtspPusher(sRtspUrl=RTSP_PUSH_URL,video_width=RTSP_WIDTH,video_height=RTSP_HEIGHT)
rtsppusher.start()
cur_frame = None
kmodel_path="/data/best_20250606.kmodel"
labels = ["people","未带安全帽","安全帽","反光衣","未穿反光衣","翻越护栏","未翻越护栏"]
model_input_size=[640,640]
rgb888p_size=[1920,1080]
display_size=[1920,1080]
confidence_threshold = 0.3
nms_threshold=0.4
osd_img = image.Image(display_size[0], display_size[1], image.ARGB8888)
# 初始化YOLO11实例
yolo=YOLO11(task_type="detect",mode="video",kmodel_path=kmodel_path,labels=labels,rgb888p_size=rgb888p_size,model_input_size=model_input_size,display_size=display_size,conf_thresh=confidence_threshold,nms_thresh=nms_threshold,max_boxes_num=50,debug_mode=0)
yolo.colors = [(125,0,0,255), (255,255,0,0), (125,0,255,0),(125,0,255,0),(255,255,0,0),(255,255,0,0), (125,0,255,0)] #蓝红绿绿红红绿
yolo.config_preprocess()
osd_img = image.Image(display_size[0], display_size[1], image.ARGB8888)
try:
_thread.start_new_thread(rtsppusher._do_rtsp_stream, ())
while True:
os.exitpoint()
cur_frame = sensor.snapshot(chn=CAM_CHN_ID_2)
img=cur_frame.to_numpy_ref()
res=yolo.run(img)
yolo.draw_result(res,osd_img)
if res:
osd_img.clear()
for i in range(len(res[0])):
resu = labels[res[1][i]] + " {0:.3f}".format(res[2][i])
print(resu)
# x, y, w, h = map(lambda x: int(round(x, 0)), res[0][i])
# osd_img.draw_rectangle(x,y, w, h, color=yolo.colors[res[1][i]],thickness=4)
# osd_img.draw_string_advanced( x , y-50,32," " + labels[res[1][i]] + " {0:.3f}".format(res[2][i]) , color=yolo.colors[res[1][i]])
Display.show_image(osd_img, 0, 0, Display.LAYER_OSD3)
gc.collect()
sleep(0.5)
except KeyboardInterrupt:
# print(f"Exception {e}")
print("中断推流")
finally:
rtsppusher.stop()
yolo.deinit()
sensor.stop()
del venc_link
Display.deinit()
encoder.Stop(VENC_CHN_ID_0)
encoder.Destroy(VENC_CHN_ID_0)
time.sleep_ms(50)
MediaManager.deinit()
gc.collect()
print("程序结束")