LCKFB开发板通过CanMV拿三路相机的数据并编码encoder3.ReleaseStream()函数报错

Viewed 40

重现步骤

CanMV拿三路相机的数据并编码,硬件配置:
image.png

期待结果和实际结果

长时间稳定运行,但运行不久就报错:
f14ee36af6411fe527a65c3ea0f59d03.png

软硬件版本信息

错误日志

尝试解决过程

补充材料

2 Answers

麻烦提供一个完整的可以正常运行的代码,你得截图看不到前面做了什么操作。另外单线程处理多路推流效率不行,需改进。

好的,这个是我的使用的代码。另外单线程处理多路推流的问题我也同步改一下测试

# Video encode example
#
# Note: You will need an SD card to run this example.
#
# You can capture videos and encode them into 264/265 files

import _thread
from media.vencoder import *
from media.sensor import *
#from media.media import *
import time, os
import utime
import network, socket
import multimedia as mm

from machine import FPIOA
from machine import I2C

import select
import time, os
from machine import UART
from machine import FPIOA
import struct

class Lidar:
    def __init__(self):
        fpioa = FPIOA()
        fpioa.set_function(50, FPIOA.UART3_TXD)
        fpioa.set_function(51, FPIOA.UART3_RXD)
        # 初始化UART2,波特率115200,8位数据位,无校验,1位停止位
        self.uart = UART(UART.UART3, baudrate=230400, bits=UART.EIGHTBITS, parity=UART.PARITY_NONE, stop=UART.STOPBITS_ONE)

    def recv(self):
        return self.uart.read();

class Mpu6050:
    def __init__(self):
        # 实例化FPIOA
        fpioa = FPIOA()

        # 设置 pin 11 12 为 IIC2
        fpioa.set_function(11, FPIOA.IIC2_SCL)
        fpioa.set_function(12, FPIOA.IIC2_SDA)

        # 获取指定功能当前所用的引脚
        print(fpioa.get_pin_num(FPIOA.IIC2_SCL))
        print(fpioa.get_pin_num(FPIOA.IIC2_SDA))


        self.i2c2 = I2C(2, freq=100000)
        a=self.i2c2.scan()
        print(a)

        # MPU6050 初始化
        self.i2c2.writeto_mem(0x68,0x6B,bytes([0x80]),mem_size=8)  # 复位 MPU6050
        time.sleep(0.1)
        self.i2c2.writeto_mem(0x68,0x6B,bytes([0x00]),mem_size=8)  # 唤醒 MPU6050,进入正常工作状态

        self.i2c2.writeto_mem(0x68,0x1B,bytes([0x18]),mem_size=8)  # 设置 MPU6050 陀螺仪传感器满量程范围 正负 2000dps
        self.i2c2.writeto_mem(0x68,0x1C,bytes([0x00]),mem_size=8)  # 加速度传感器配置寄存器这里 正负 2g
        self.i2c2.writeto_mem(0x68,0x19,bytes([0x19]),mem_size=8)  # 陀螺仪采样率 50hz
        self.i2c2.writeto_mem(0x68,0x1A,bytes([0x03]),mem_size=8)  # 设置 MPU6050 的数字低通滤波器 50hz  0x03,42hz

        self.i2c2.writeto_mem(0x68,0x38,bytes([0x00]),mem_size=8)  # 关闭所有中断
        self.i2c2.writeto_mem(0x68,0x6A,bytes([0x00]),mem_size=8)  # I2C主模式关闭
        self.i2c2.writeto_mem(0x68,0x23,bytes([0x00]),mem_size=8)  # 关闭FIFO
        self.i2c2.writeto_mem(0x68,0x37,bytes([0x80]),mem_size=8)  # INT引脚低电平有效

        id_buf = self.i2c2.readfrom_mem(0x68,0x75,1,mem_size=8)  # 读取MPU6050的器件地址
        if (id_buf[0] != 0x68):
            print("检测不到 MPU6050 模块", hex(id_buf[0]))
            sys.exit(1)

        self.i2c2.writeto_mem(0x68,0x6B,bytes([0x01]),mem_size=8)  # 设置CLKSEL,PLL X轴为参考
        self.i2c2.writeto_mem(0x68,0x6C,bytes([0x00]),mem_size=8)  # 加速度与陀螺仪都工作
        self.i2c2.writeto_mem(0x68,0x19,bytes([0x19]),mem_size=8)  # 陀螺仪采样率 50hz
        self.i2c2.writeto_mem(0x68,0x1A,bytes([0x03]),mem_size=8)  # 设置 MPU6050 的数字低通滤波器 50hz  0x03,42hz
        self.gyro_buf = bytearray(6)
        self.acc_buf = bytearray(6)
        self.gyro_acc_data = [0.0, 0.0, 0.0, 0.0, 0.0, 0.0]

    def recv(self):
        """读取并解析陀螺仪和加速度计数据"""
        try:
            self.gyro_buf = self.i2c2.readfrom_mem(0x68,0x43,6,mem_size=8)
            self.acc_buf = self.i2c2.readfrom_mem(0x68,0x3B,6,mem_size=8)
            # 计算陀螺仪数据
            self.gyro_acc_data[0] = (self.gyro_buf[1] + (self.gyro_buf[0] << 8)) * 0.001064
            self.gyro_acc_data[1] = (self.gyro_buf[3] + (self.gyro_buf[2] << 8)) * 0.001064
            self.gyro_acc_data[2] = (self.gyro_buf[5] + (self.gyro_buf[4] << 8)) * 0.001064
            # 计算加速度计数据
            self.gyro_acc_data[3] = (self.acc_buf[1] + (self.acc_buf[0] << 8)) / 16384
            self.gyro_acc_data[4] = (self.acc_buf[3] + (self.acc_buf[2] << 8)) / 16384
            self.gyro_acc_data[5] = (self.acc_buf[5] + (self.acc_buf[4] << 8)) / 16384

            return self.gyro_acc_data
        except Exception as e:
            print(f"数据读取错误: {e}")
            return None

class TCPServer:
    def __init__(self, host='0.0.0.0', port=8888):
        self.host = host
        self.port = port
        self.server_socket = None
        self.client_socket = None
        self.client_address = None
        self.accept_thread = None
        self.running = False

    def set_noblock(self):
        self.client_socket.setblocking(False)

    def start(self):
        """启动TCP服务器"""
        try:
            self.server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
            self.server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
            self.server_socket.bind((self.host, self.port))
            self.server_socket.listen(1)
            self.server_socket.setblocking(False)
            print(f"TCP服务器已启动,监听地址: {self.host}:{self.port}")

            # 启动接受连接的线程(修正后的写法)
            self.running = True
#            _thread.start_new_thread(self._accept_connections, ())
#            return True
        except Exception as e:
            print(f"启动服务器失败: {e}", self.port)
            return False

        try:
            print("--->")
            client_socket, client_address = self.server_socket.accept()
            print("<---")

            self._close_client()
            self.client_socket = client_socket
            self.client_address = client_address
            print(f"客户端已连接: {client_address}")

        except Exception as e:
            if self.running:
                print(f"接受连接时出错: {e}")
                time.sleep(0.1)

    def _accept_connections(self):
        """在单独线程中处理客户端连接"""
        while self.running:
            try:
                print("--->")
                client_socket, client_address = self.server_socket.accept()
                print("<---")

                self._close_client()
                self.client_socket = client_socket
                self.client_address = client_address
                print(f"客户端已连接: {client_address}")

            except Exception as e:
                if self.running:
                    print(f"接受连接时出错: {e}")
                    time.sleep(0.1)

    def recv(self):
        """
        非阻塞接收单个字节数据(超时时间为0)
        :return: 接收到的字节数据(bytes类型),无数据时返回None
        """
        if not self.client_socket:
            print("无活跃的客户端连接", self.port)
            return None

        try:
            # 使用select立即检查是否有数据可读(超时时间0秒)
#            readable, _, _ = select.select([self.client_socket], [], [], 1)

#            if readable:
                # 有数据可读,接收1个字节
            data = self.client_socket.recv(1)

            if data:
                return data
            else:
                # 接收到空数据表示客户端断开连接
                print("客户端已断开连接")
                self._close_client()
                return None
#            else:
#                # 无数据可读,立即返回None
#                return None

        except Exception as e:
            print(f"接收数据异常: {e}")
            self._close_client()
            return None


    def send(self, data):
        """发送数据到客户端"""
        # 检查是否已有客户端连接
        if self.client_socket:
            try:
                # 发送数据头
                self.client_socket.sendall(bytes([0xEB, 0x90, 0x2A, 0x00]))
                # 发送时间戳
                timestamp = utime.ticks_us()
                timestamp_64bit = timestamp.to_bytes(8, 'little')
                self.client_socket.sendall(timestamp_64bit)
                # 发送数据长度
                data_length = len(data)
                length_bytes = data_length.to_bytes(4, 'little')
                self.client_socket.sendall(length_bytes)
                # 发送实际数据
                self.client_socket.sendall(data)
                return True
            except Exception as e:
                print(f"发送数据失败: {e}")
                self._close_client()
                return False
        else:
            print("没有活跃的客户端连接", self.port)
            return False

    def _close_client(self):
        """关闭当前客户端连接"""
        if self.client_socket:
            try:
                self.client_socket.close()
            except:
                pass
            self.client_socket = None
            self.client_address = None
            print("客户端连接已关闭")

    def close(self):
        """关闭服务器和客户端连接"""
        self.running = False

        # 关闭服务器套接字(强制 accept() 抛出异常)
        if self.server_socket:
            try:
                self.server_socket.close()
            except:
                pass
            self.server_socket = None

        # 等待接受线程结束
        if self.accept_thread and self.accept_thread.is_alive():
            self.accept_thread.join(timeout=1.0)

        # 关闭客户端连接
        self._close_client()

        print("TCP服务器已关闭")

switch = 0;
def thread_function(name):
    camera_switch = TCPServer("12.12.12.100", 9000)
    camera_switch.start()
    global switch
    while (True):
        temp_switch = camera_switch.recv()
        if temp_switch is not None:
            switch = temp_switch[0]
            print("switch:", switch)


def vi_bind_venc_test(camera_id, width=1280, height=720):
    print("venc_test start")
    venc_chn = VENC_CHN_ID_0
    width = ALIGN_UP(width, 16)

    _thread.start_new_thread(thread_function, ("Thread-1",))

#    camera_switch = TCPServer("12.12.12.100", 9000)
    camera_server = TCPServer("12.12.12.100", 9001)
#    imu_server = TCPServer("12.12.12.100", 9010)
#    lidar_server = TCPServer("12.12.12.100", 9020)

#    camera_switch.start()
#    camera_switch.set_noblock()
    time.sleep(1)
    camera_server.start()
    time.sleep(0.1)
#    imu_server.start()
#    time.sleep(0.1)
#    lidar_server.start()
    print("networ init ok")

    # 初始化sensor
    sensor1 = Sensor(id=0, fps=30)
    sensor1.reset()
    sensor2 = Sensor(id=1, fps=30)
    sensor2.reset()
    sensor3 = Sensor(id=2, fps=30)
    sensor3.reset()
    print("sensor_reset init ok")

    # 设置camera 输出buffer
    sensor1.set_framesize(width = width, height = height, alignment=12)
    sensor1.set_pixformat(Sensor.YUV420SP)
    sensor2.set_framesize(width = width, height = height, alignment=12)
    sensor2.set_pixformat(Sensor.YUV420SP)
    sensor3.set_framesize(width = width, height = height, alignment=12)
    sensor3.set_pixformat(Sensor.YUV420SP)
    print("set_pixformat init ok")


    # 实例化video encoder
    encoder1 = Encoder()
    encoder1.SetOutBufs(VENC_CHN_ID_0, 8, width, height)
    encoder2 = Encoder()
    encoder2.SetOutBufs(VENC_CHN_ID_1, 8, width, height)
    encoder3 = Encoder()
    encoder3.SetOutBufs(VENC_CHN_ID_2, 8, width, height)
    print("SetOutBufs init ok")

    # 绑定camera和venc
    link1 = MediaManager.link(sensor1.bind_info()['src'], (VIDEO_ENCODE_MOD_ID, VENC_DEV_ID, VENC_CHN_ID_0))
    link2 = MediaManager.link(sensor2.bind_info()['src'], (VIDEO_ENCODE_MOD_ID, VENC_DEV_ID, VENC_CHN_ID_1))
    link3 = MediaManager.link(sensor3.bind_info()['src'], (VIDEO_ENCODE_MOD_ID, VENC_DEV_ID, VENC_CHN_ID_2))
    print("link init ok")

    # init media manager
    MediaManager.init()
    print("MediaManager init ok")

    chnAttr1 = ChnAttrStr(encoder1.PAYLOAD_TYPE_H264, encoder1.H264_PROFILE_MAIN, width, height)
    chnAttr2 = ChnAttrStr(encoder2.PAYLOAD_TYPE_H264, encoder2.H264_PROFILE_MAIN, width, height)
    chnAttr3 = ChnAttrStr(encoder3.PAYLOAD_TYPE_H264, encoder3.H264_PROFILE_MAIN, width, height)
    print("chnAttr3 init ok")

    streamData1 = StreamData()
    streamData2 = StreamData()
    streamData3 = StreamData()

    # 创建编码器
    encoder1.Create(VENC_CHN_ID_0, chnAttr1)
    encoder2.Create(VENC_CHN_ID_1, chnAttr2)
    encoder3.Create(VENC_CHN_ID_2, chnAttr3)
    print("create encoder init ok")

    # 开始编码
    encoder1.Start(VENC_CHN_ID_0)
    encoder2.Start(VENC_CHN_ID_1)
    encoder3.Start(VENC_CHN_ID_2)
    print("start encoder init ok")

    # 启动camera
    sensor1.run()
    sensor2.run()
    sensor3.run()
    print("camera run init ok")

    frame_count = 0

    # 及时取数据,否则出问题
#    imu = Mpu6050()
#    lidar = Lidar()
    print("------ start ------");

    try:
#        switch = 0
        global switch
        while True:
            os.exitpoint()

            # Lidar处理
#            lidar_data = lidar.recv();
#            if lidar_data is not None:
#                print("lidar:", len(lidar_data))
#                lidar_server.send(lidar_data)

            # IMU处理
#            imu_data = imu.recv()
#            if imu_data is not None:
#                gyro_acc_bytes = struct.pack('<6f', *imu_data)
#                imu_server.send(gyro_acc_bytes)

            # 相机处理
            encoder1.GetStream(VENC_CHN_ID_0, streamData1) # 获取一帧码流
            for pack_idx in range(0, streamData1.pack_cnt):
                stream_data = bytes(uctypes.bytearray_at(streamData1.data[pack_idx], streamData1.data_size[pack_idx]))
                if (switch == 1):
                    camera_server.send(stream_data)
                    print("camera1");
#                print("stream[1][", frame_count, "] [",new_time-old_time1,"] size: ", streamData1.data_size[pack_idx], "stream type: ", streamData1.stream_type[pack_idx])
#                old_time1=new_time
            encoder1.ReleaseStream(VENC_CHN_ID_0, streamData1) # 释放一帧码流

            new_time = utime.ticks_us()
            encoder2.GetStream(VENC_CHN_ID_1, streamData2) # 获取一帧码流
            for pack_idx in range(0, streamData2.pack_cnt):
                stream_data = bytes(uctypes.bytearray_at(streamData2.data[pack_idx], streamData2.data_size[pack_idx]))
                if (switch == 2):
                    camera_server.send(stream_data)
                    print("camera2");
#                print("stream[2][", frame_count, "] [",new_time-old_time2,"] size: ", streamData2.data_size[pack_idx], "stream type: ", streamData2.stream_type[pack_idx])
#                old_time2=new_time
            encoder2.ReleaseStream(VENC_CHN_ID_1, streamData2) # 释放一帧码流

            new_time = utime.ticks_us()
            encoder3.GetStream(VENC_CHN_ID_2, streamData3) # 获取一帧码流
            for pack_idx in range(0, streamData3.pack_cnt):
                stream_data = bytes(uctypes.bytearray_at(streamData3.data[pack_idx], streamData3.data_size[pack_idx]))
                if (switch == 3):
                    camera_server.send(stream_data)
                    print("camera3");
#                print("stream[3][", frame_count, "] [",new_time-old_time2,"] size: ", streamData3.data_size[pack_idx], "stream type: ", streamData3.stream_type[pack_idx])
#                old_time2=new_time
            encoder3.ReleaseStream(VENC_CHN_ID_2, streamData3) # 释放一帧码流
            frame_count += 1

    except KeyboardInterrupt as e:
        print("user stop: ", e)
    except BaseException as e:
        import sys
        sys.print_exception(e)

    # 停止camera
    sensor1.stop()
    sensor2.stop()
    sensor3.stop()
    # 销毁camera和venc的绑定
    del link
    # 停止编码
    encoder.Stop(venc_chn)
    # 销毁编码器
    encoder.Destroy(venc_chn)
    # 清理buffer
    MediaManager.deinit()
    print("venc_test stop")

    #RTSP
#    rtspserver.rtspserver_stop()
#    rtspserver.rtspserver_deinit()

def nic_config():
    # 创建网络接口
    nic = network.LAN()

    # 设置DHCP模式(由你的路由器自动分配)
    nic.ifconfig(('12.12.12.100', '255.255.255.0', '12.12.12.1', '8.8.8.8'))

    # 检查连接状态
    if nic.isconnected():
        print("网络配置成功!")
        print("自动获取的配置:", nic.ifconfig())
    else:
        print("连接失败,请检查网线连接")

def test_print():
    while True:
        time.sleep(1)
        print("test_print");


if __name__ == "__main__":
    os.exitpoint(os.EXITPOINT_ENABLE)

#    _thread.start_new_thread(test_print, ())
    # 初始化网络
    nic_config()
    time.sleep(1)

    # 启动RTSP推流
    vi_bind_venc_test(1, 1280,720)  # vi绑定venc示例