重现步骤
CanMV拿三路相机的数据并编码,硬件配置:
期待结果和实际结果
长时间稳定运行,但运行不久就报错:
软硬件版本信息
错误日志
尝试解决过程
补充材料
重现步骤
CanMV拿三路相机的数据并编码,硬件配置:
期待结果和实际结果
长时间稳定运行,但运行不久就报错:
软硬件版本信息
错误日志
尝试解决过程
补充材料
好的,这个是我的使用的代码。另外单线程处理多路推流的问题我也同步改一下测试
# Video encode example
#
# Note: You will need an SD card to run this example.
#
# You can capture videos and encode them into 264/265 files
import _thread
from media.vencoder import *
from media.sensor import *
#from media.media import *
import time, os
import utime
import network, socket
import multimedia as mm
from machine import FPIOA
from machine import I2C
import select
import time, os
from machine import UART
from machine import FPIOA
import struct
class Lidar:
def __init__(self):
fpioa = FPIOA()
fpioa.set_function(50, FPIOA.UART3_TXD)
fpioa.set_function(51, FPIOA.UART3_RXD)
# 初始化UART2,波特率115200,8位数据位,无校验,1位停止位
self.uart = UART(UART.UART3, baudrate=230400, bits=UART.EIGHTBITS, parity=UART.PARITY_NONE, stop=UART.STOPBITS_ONE)
def recv(self):
return self.uart.read();
class Mpu6050:
def __init__(self):
# 实例化FPIOA
fpioa = FPIOA()
# 设置 pin 11 12 为 IIC2
fpioa.set_function(11, FPIOA.IIC2_SCL)
fpioa.set_function(12, FPIOA.IIC2_SDA)
# 获取指定功能当前所用的引脚
print(fpioa.get_pin_num(FPIOA.IIC2_SCL))
print(fpioa.get_pin_num(FPIOA.IIC2_SDA))
self.i2c2 = I2C(2, freq=100000)
a=self.i2c2.scan()
print(a)
# MPU6050 初始化
self.i2c2.writeto_mem(0x68,0x6B,bytes([0x80]),mem_size=8) # 复位 MPU6050
time.sleep(0.1)
self.i2c2.writeto_mem(0x68,0x6B,bytes([0x00]),mem_size=8) # 唤醒 MPU6050,进入正常工作状态
self.i2c2.writeto_mem(0x68,0x1B,bytes([0x18]),mem_size=8) # 设置 MPU6050 陀螺仪传感器满量程范围 正负 2000dps
self.i2c2.writeto_mem(0x68,0x1C,bytes([0x00]),mem_size=8) # 加速度传感器配置寄存器这里 正负 2g
self.i2c2.writeto_mem(0x68,0x19,bytes([0x19]),mem_size=8) # 陀螺仪采样率 50hz
self.i2c2.writeto_mem(0x68,0x1A,bytes([0x03]),mem_size=8) # 设置 MPU6050 的数字低通滤波器 50hz 0x03,42hz
self.i2c2.writeto_mem(0x68,0x38,bytes([0x00]),mem_size=8) # 关闭所有中断
self.i2c2.writeto_mem(0x68,0x6A,bytes([0x00]),mem_size=8) # I2C主模式关闭
self.i2c2.writeto_mem(0x68,0x23,bytes([0x00]),mem_size=8) # 关闭FIFO
self.i2c2.writeto_mem(0x68,0x37,bytes([0x80]),mem_size=8) # INT引脚低电平有效
id_buf = self.i2c2.readfrom_mem(0x68,0x75,1,mem_size=8) # 读取MPU6050的器件地址
if (id_buf[0] != 0x68):
print("检测不到 MPU6050 模块", hex(id_buf[0]))
sys.exit(1)
self.i2c2.writeto_mem(0x68,0x6B,bytes([0x01]),mem_size=8) # 设置CLKSEL,PLL X轴为参考
self.i2c2.writeto_mem(0x68,0x6C,bytes([0x00]),mem_size=8) # 加速度与陀螺仪都工作
self.i2c2.writeto_mem(0x68,0x19,bytes([0x19]),mem_size=8) # 陀螺仪采样率 50hz
self.i2c2.writeto_mem(0x68,0x1A,bytes([0x03]),mem_size=8) # 设置 MPU6050 的数字低通滤波器 50hz 0x03,42hz
self.gyro_buf = bytearray(6)
self.acc_buf = bytearray(6)
self.gyro_acc_data = [0.0, 0.0, 0.0, 0.0, 0.0, 0.0]
def recv(self):
"""读取并解析陀螺仪和加速度计数据"""
try:
self.gyro_buf = self.i2c2.readfrom_mem(0x68,0x43,6,mem_size=8)
self.acc_buf = self.i2c2.readfrom_mem(0x68,0x3B,6,mem_size=8)
# 计算陀螺仪数据
self.gyro_acc_data[0] = (self.gyro_buf[1] + (self.gyro_buf[0] << 8)) * 0.001064
self.gyro_acc_data[1] = (self.gyro_buf[3] + (self.gyro_buf[2] << 8)) * 0.001064
self.gyro_acc_data[2] = (self.gyro_buf[5] + (self.gyro_buf[4] << 8)) * 0.001064
# 计算加速度计数据
self.gyro_acc_data[3] = (self.acc_buf[1] + (self.acc_buf[0] << 8)) / 16384
self.gyro_acc_data[4] = (self.acc_buf[3] + (self.acc_buf[2] << 8)) / 16384
self.gyro_acc_data[5] = (self.acc_buf[5] + (self.acc_buf[4] << 8)) / 16384
return self.gyro_acc_data
except Exception as e:
print(f"数据读取错误: {e}")
return None
class TCPServer:
def __init__(self, host='0.0.0.0', port=8888):
self.host = host
self.port = port
self.server_socket = None
self.client_socket = None
self.client_address = None
self.accept_thread = None
self.running = False
def set_noblock(self):
self.client_socket.setblocking(False)
def start(self):
"""启动TCP服务器"""
try:
self.server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self.server_socket.bind((self.host, self.port))
self.server_socket.listen(1)
self.server_socket.setblocking(False)
print(f"TCP服务器已启动,监听地址: {self.host}:{self.port}")
# 启动接受连接的线程(修正后的写法)
self.running = True
# _thread.start_new_thread(self._accept_connections, ())
# return True
except Exception as e:
print(f"启动服务器失败: {e}", self.port)
return False
try:
print("--->")
client_socket, client_address = self.server_socket.accept()
print("<---")
self._close_client()
self.client_socket = client_socket
self.client_address = client_address
print(f"客户端已连接: {client_address}")
except Exception as e:
if self.running:
print(f"接受连接时出错: {e}")
time.sleep(0.1)
def _accept_connections(self):
"""在单独线程中处理客户端连接"""
while self.running:
try:
print("--->")
client_socket, client_address = self.server_socket.accept()
print("<---")
self._close_client()
self.client_socket = client_socket
self.client_address = client_address
print(f"客户端已连接: {client_address}")
except Exception as e:
if self.running:
print(f"接受连接时出错: {e}")
time.sleep(0.1)
def recv(self):
"""
非阻塞接收单个字节数据(超时时间为0)
:return: 接收到的字节数据(bytes类型),无数据时返回None
"""
if not self.client_socket:
print("无活跃的客户端连接", self.port)
return None
try:
# 使用select立即检查是否有数据可读(超时时间0秒)
# readable, _, _ = select.select([self.client_socket], [], [], 1)
# if readable:
# 有数据可读,接收1个字节
data = self.client_socket.recv(1)
if data:
return data
else:
# 接收到空数据表示客户端断开连接
print("客户端已断开连接")
self._close_client()
return None
# else:
# # 无数据可读,立即返回None
# return None
except Exception as e:
print(f"接收数据异常: {e}")
self._close_client()
return None
def send(self, data):
"""发送数据到客户端"""
# 检查是否已有客户端连接
if self.client_socket:
try:
# 发送数据头
self.client_socket.sendall(bytes([0xEB, 0x90, 0x2A, 0x00]))
# 发送时间戳
timestamp = utime.ticks_us()
timestamp_64bit = timestamp.to_bytes(8, 'little')
self.client_socket.sendall(timestamp_64bit)
# 发送数据长度
data_length = len(data)
length_bytes = data_length.to_bytes(4, 'little')
self.client_socket.sendall(length_bytes)
# 发送实际数据
self.client_socket.sendall(data)
return True
except Exception as e:
print(f"发送数据失败: {e}")
self._close_client()
return False
else:
print("没有活跃的客户端连接", self.port)
return False
def _close_client(self):
"""关闭当前客户端连接"""
if self.client_socket:
try:
self.client_socket.close()
except:
pass
self.client_socket = None
self.client_address = None
print("客户端连接已关闭")
def close(self):
"""关闭服务器和客户端连接"""
self.running = False
# 关闭服务器套接字(强制 accept() 抛出异常)
if self.server_socket:
try:
self.server_socket.close()
except:
pass
self.server_socket = None
# 等待接受线程结束
if self.accept_thread and self.accept_thread.is_alive():
self.accept_thread.join(timeout=1.0)
# 关闭客户端连接
self._close_client()
print("TCP服务器已关闭")
switch = 0;
def thread_function(name):
camera_switch = TCPServer("12.12.12.100", 9000)
camera_switch.start()
global switch
while (True):
temp_switch = camera_switch.recv()
if temp_switch is not None:
switch = temp_switch[0]
print("switch:", switch)
def vi_bind_venc_test(camera_id, width=1280, height=720):
print("venc_test start")
venc_chn = VENC_CHN_ID_0
width = ALIGN_UP(width, 16)
_thread.start_new_thread(thread_function, ("Thread-1",))
# camera_switch = TCPServer("12.12.12.100", 9000)
camera_server = TCPServer("12.12.12.100", 9001)
# imu_server = TCPServer("12.12.12.100", 9010)
# lidar_server = TCPServer("12.12.12.100", 9020)
# camera_switch.start()
# camera_switch.set_noblock()
time.sleep(1)
camera_server.start()
time.sleep(0.1)
# imu_server.start()
# time.sleep(0.1)
# lidar_server.start()
print("networ init ok")
# 初始化sensor
sensor1 = Sensor(id=0, fps=30)
sensor1.reset()
sensor2 = Sensor(id=1, fps=30)
sensor2.reset()
sensor3 = Sensor(id=2, fps=30)
sensor3.reset()
print("sensor_reset init ok")
# 设置camera 输出buffer
sensor1.set_framesize(width = width, height = height, alignment=12)
sensor1.set_pixformat(Sensor.YUV420SP)
sensor2.set_framesize(width = width, height = height, alignment=12)
sensor2.set_pixformat(Sensor.YUV420SP)
sensor3.set_framesize(width = width, height = height, alignment=12)
sensor3.set_pixformat(Sensor.YUV420SP)
print("set_pixformat init ok")
# 实例化video encoder
encoder1 = Encoder()
encoder1.SetOutBufs(VENC_CHN_ID_0, 8, width, height)
encoder2 = Encoder()
encoder2.SetOutBufs(VENC_CHN_ID_1, 8, width, height)
encoder3 = Encoder()
encoder3.SetOutBufs(VENC_CHN_ID_2, 8, width, height)
print("SetOutBufs init ok")
# 绑定camera和venc
link1 = MediaManager.link(sensor1.bind_info()['src'], (VIDEO_ENCODE_MOD_ID, VENC_DEV_ID, VENC_CHN_ID_0))
link2 = MediaManager.link(sensor2.bind_info()['src'], (VIDEO_ENCODE_MOD_ID, VENC_DEV_ID, VENC_CHN_ID_1))
link3 = MediaManager.link(sensor3.bind_info()['src'], (VIDEO_ENCODE_MOD_ID, VENC_DEV_ID, VENC_CHN_ID_2))
print("link init ok")
# init media manager
MediaManager.init()
print("MediaManager init ok")
chnAttr1 = ChnAttrStr(encoder1.PAYLOAD_TYPE_H264, encoder1.H264_PROFILE_MAIN, width, height)
chnAttr2 = ChnAttrStr(encoder2.PAYLOAD_TYPE_H264, encoder2.H264_PROFILE_MAIN, width, height)
chnAttr3 = ChnAttrStr(encoder3.PAYLOAD_TYPE_H264, encoder3.H264_PROFILE_MAIN, width, height)
print("chnAttr3 init ok")
streamData1 = StreamData()
streamData2 = StreamData()
streamData3 = StreamData()
# 创建编码器
encoder1.Create(VENC_CHN_ID_0, chnAttr1)
encoder2.Create(VENC_CHN_ID_1, chnAttr2)
encoder3.Create(VENC_CHN_ID_2, chnAttr3)
print("create encoder init ok")
# 开始编码
encoder1.Start(VENC_CHN_ID_0)
encoder2.Start(VENC_CHN_ID_1)
encoder3.Start(VENC_CHN_ID_2)
print("start encoder init ok")
# 启动camera
sensor1.run()
sensor2.run()
sensor3.run()
print("camera run init ok")
frame_count = 0
# 及时取数据,否则出问题
# imu = Mpu6050()
# lidar = Lidar()
print("------ start ------");
try:
# switch = 0
global switch
while True:
os.exitpoint()
# Lidar处理
# lidar_data = lidar.recv();
# if lidar_data is not None:
# print("lidar:", len(lidar_data))
# lidar_server.send(lidar_data)
# IMU处理
# imu_data = imu.recv()
# if imu_data is not None:
# gyro_acc_bytes = struct.pack('<6f', *imu_data)
# imu_server.send(gyro_acc_bytes)
# 相机处理
encoder1.GetStream(VENC_CHN_ID_0, streamData1) # 获取一帧码流
for pack_idx in range(0, streamData1.pack_cnt):
stream_data = bytes(uctypes.bytearray_at(streamData1.data[pack_idx], streamData1.data_size[pack_idx]))
if (switch == 1):
camera_server.send(stream_data)
print("camera1");
# print("stream[1][", frame_count, "] [",new_time-old_time1,"] size: ", streamData1.data_size[pack_idx], "stream type: ", streamData1.stream_type[pack_idx])
# old_time1=new_time
encoder1.ReleaseStream(VENC_CHN_ID_0, streamData1) # 释放一帧码流
new_time = utime.ticks_us()
encoder2.GetStream(VENC_CHN_ID_1, streamData2) # 获取一帧码流
for pack_idx in range(0, streamData2.pack_cnt):
stream_data = bytes(uctypes.bytearray_at(streamData2.data[pack_idx], streamData2.data_size[pack_idx]))
if (switch == 2):
camera_server.send(stream_data)
print("camera2");
# print("stream[2][", frame_count, "] [",new_time-old_time2,"] size: ", streamData2.data_size[pack_idx], "stream type: ", streamData2.stream_type[pack_idx])
# old_time2=new_time
encoder2.ReleaseStream(VENC_CHN_ID_1, streamData2) # 释放一帧码流
new_time = utime.ticks_us()
encoder3.GetStream(VENC_CHN_ID_2, streamData3) # 获取一帧码流
for pack_idx in range(0, streamData3.pack_cnt):
stream_data = bytes(uctypes.bytearray_at(streamData3.data[pack_idx], streamData3.data_size[pack_idx]))
if (switch == 3):
camera_server.send(stream_data)
print("camera3");
# print("stream[3][", frame_count, "] [",new_time-old_time2,"] size: ", streamData3.data_size[pack_idx], "stream type: ", streamData3.stream_type[pack_idx])
# old_time2=new_time
encoder3.ReleaseStream(VENC_CHN_ID_2, streamData3) # 释放一帧码流
frame_count += 1
except KeyboardInterrupt as e:
print("user stop: ", e)
except BaseException as e:
import sys
sys.print_exception(e)
# 停止camera
sensor1.stop()
sensor2.stop()
sensor3.stop()
# 销毁camera和venc的绑定
del link
# 停止编码
encoder.Stop(venc_chn)
# 销毁编码器
encoder.Destroy(venc_chn)
# 清理buffer
MediaManager.deinit()
print("venc_test stop")
#RTSP
# rtspserver.rtspserver_stop()
# rtspserver.rtspserver_deinit()
def nic_config():
# 创建网络接口
nic = network.LAN()
# 设置DHCP模式(由你的路由器自动分配)
nic.ifconfig(('12.12.12.100', '255.255.255.0', '12.12.12.1', '8.8.8.8'))
# 检查连接状态
if nic.isconnected():
print("网络配置成功!")
print("自动获取的配置:", nic.ifconfig())
else:
print("连接失败,请检查网线连接")
def test_print():
while True:
time.sleep(1)
print("test_print");
if __name__ == "__main__":
os.exitpoint(os.EXITPOINT_ENABLE)
# _thread.start_new_thread(test_print, ())
# 初始化网络
nic_config()
time.sleep(1)
# 启动RTSP推流
vi_bind_venc_test(1, 1280,720) # vi绑定venc示例