关于三摄像头AI识别物体的问题2

Viewed 47

问题描述


在先前大佬的指点下,我完成了庐山派K230的三个摄像头的配置,但是在实现过程中出现了一些问题,当我尝试运行代码时,发现K230刚开始运行就与电脑断连了(后面经过排查排除了硬件问题),推测有可能是K230的内存不够导致的,我的代码的主要任务如下:
1、我需要用摄像头1来识别“me”、“you”、“he”三种物体,并且支持两种观察模式(一种是三种物体同时看到并以从左到右(按识别到的数据x从小到大)的形式来排序成信息用串口返回,另一种是挨个挨个看,并将看到的种类按特定数字用串口返回);
2、摄像头二用于识别单一的物体并将该物体以特定数字用串口返回;
3、摄像头3和摄像头1类似,来识别‘1’,‘2’,‘3’,‘4’,‘5’这些数字,并且支持两种观察模式(一种是五个数字同时看到并以从左到右(按数据x从小到大)的形式来排序成信息用串口返回,另一种是挨个挨个看,并将看到的种类按特定数字用串口返回);
然后所有摄像头的运行顺序是:显示拍摄-》显示-》如果收到指令开始ai识别(循环开始)-》显示拍摄、ai拍摄-》ai处理-》返回信息并以显示的比例在lcd屏上画对应的框-》收到结束指令(循环结束)

我的代码如下:

from libs.PipeLine import PipeLine, ScopedTiming
from libs.AIBase import AIBase
from libs.AI2D import Ai2d
import os,sys,gc,time,random,utime,urandom, math
import ujson
from media.media import *
from time import *
import nncase_runtime as nn
import ulab.numpy as np
import image,aidemo,aicube
from media.sensor import *
from media.display import *
from machine import Pin,FPIOA,UART
#import time, os, sys
from libs.PipeLine import ScopedTiming
from libs.Utils import *

#配置串口
fpioa = FPIOA()
fpioa.set_function(50, FPIOA.UART3_TXD)
fpioa.set_function(51, FPIOA.UART3_RXD)
fpioa.set_function(53, FPIOA.GPIO53)

#对分辨率进行一些配置
display_width = 800
display_height = 480
uart = UART(UART.UART3, baudrate=115200, bits=UART.EIGHTBITS, parity=UART.PARITY_NONE, stop=UART.STOPBITS_ONE)

OUT_RGB888P_WIDTH = ALIGN_UP(640, 16)
OUT_RGB888P_HEIGHT = 360

# 显示模式选择
DISPLAY_MODE = "LCD"

picture_width = 800
picture_height = 480

if DISPLAY_MODE == "LCD":
    DISPLAY_WIDTH = 800
    DISPLAY_HEIGHT = 480


class_1_map = {
    "one": 0x01,
    "two": 0x02,
    "three": 0x03,
    "four": 0x04,
    "five": 0x05,
    "greenpee": 0x06,
    "yellowpee": 0x07,
    "whitepee": 0x08
}

#分辨率比例转换,这里用于将lcd屏幕的800*480转换为适用于YOLO的分辨率640
def two_side_pad_param(input_size, output_size):
    ratio_w = output_size[0] / input_size[0]  # 宽度缩放比例
    ratio_h = output_size[1] / input_size[1]  # 高度缩放比例
    ratio = min(ratio_w, ratio_h)  # 取较小的缩放比例
    new_w = int(ratio * input_size[0])  # 新宽度
    new_h = int(ratio * input_size[1])  # 新高度
    dw = (output_size[0] - new_w) / 2  # 宽度差
    dh = (output_size[1] - new_h) / 2  # 高度差
    top = int(round(dh - 0.1))
    bottom = int(round(dh + 0.1))
    left = int(round(dw - 0.1))
    right = int(round(dw + 0.1))
    return top, bottom, left, right, ratio

def split_coordinates(value):
    """
    将一个16位数值拆分为高字节和低字节
    value: 要拆分的16位整数值
    return: (high_byte, low_byte) 元组
    """
    high_byte = (value >> 8) & 0xFF  # 右移8位获取高字节
    low_byte = value & 0xFF          # 与0xFF按位与获取低字节
    return high_byte, low_byte

# 自定义YOLO检测类,继承自AIBase基类
class YOLOv11App(AIBase):
    def __init__(self, kmodel_path, model_input_size, anchors, confidence_threshold=0.8, nms_threshold=0.2, rgb888p_size = [640, 360], display_size=[800,480], debug_mode=0):
        super().__init__(kmodel_path, model_input_size, rgb888p_size, debug_mode)  # 调用基类的构造函数
        self.class_id = ["me", "you", "he", "one", "two", "three", "four", "five"]
        self.kmodel_path = kmodel_path  # 模型文件路径
        self.model_input_size = model_input_size  # 模型输入分辨率
        self.confidence_threshold = confidence_threshold  # 置信度阈值
        self.nms_threshold = nms_threshold  # NMS(非极大值抑制)阈值
        self.anchors = anchors  # 锚点数据,用于目标检测
        self.rgb888p_size = [ALIGN_UP(rgb888p_size[0], 16), rgb888p_size[1]]  # sensor给到AI的图像分辨率,并对宽度进行16的对齐
        self.display_size = [ALIGN_UP(display_size[0], 16), display_size[1]]  # 显示分辨率,并对宽度进行16的对齐
        self.debug_mode = debug_mode  # 是否开启调试模式
        self.ai2d = Ai2d(debug_mode)  # 实例化Ai2d,用于实现模型预处理
        self.ai2d.set_ai2d_dtype(nn.ai2d_format.NCHW_FMT, nn.ai2d_format.NCHW_FMT, np.uint8, np.uint8)  # 设置Ai2d的输入输出格式和类型
        self.last_result = None
        self.stable_count = 0

    # 配置预处理操作,这里使用了pad和resize,Ai2d支持crop/shift/pad/resize/affine,具体代码请打开/sdcard/app/libs/AI2D.py查看
    def config_preprocess(self, input_image_size=None):
        with ScopedTiming("set preprocess config", self.debug_mode > 0):  # 计时器,如果debug_mode大于0则开启
            ai2d_input_size = input_image_size if input_image_size else self.rgb888p_size  # 初始化ai2d预处理配置,默认为sensor给到AI的尺寸,可以通过设置input_image_size自行修改输入尺寸
            top, bottom, left, right = self.get_padding_param()  # 获取padding参数
            print("padding: {} {} {} {}".format(top, bottom, left, right))
            self.ai2d.pad([0, 0, 0, 0, top, bottom, left, right], 0, [104, 117, 123])  # 填充边缘
            self.ai2d.resize(nn.interp_method.tf_bilinear, nn.interp_mode.half_pixel)  # 缩放图像
            self.ai2d.build([1,3,ai2d_input_size[1],ai2d_input_size[0]],[1,3,self.model_input_size[1],self.model_input_size[0]])  # 构建预处理流程

    # 自定义当前任务的后处理,results是模型输出array列表
    def postprocess(self, results):
        counter = 0
        det_res = []
        with ScopedTiming("postprocess", self.debug_mode > 0):
            # 输出形状为[1, 1, 14, 2100]
            # 意思是,输出了2100个框,每个框有14个数据,其中前4个数据是xywh,后面10个是每个类别对应的置信度,这里的xy指的是中心点坐标
            for i in range(2100):
                result = results[0][0][:, i]
                max_score = max(result[4:])
                if max_score > self.confidence_threshold:
                    # 这里把位置信息恢复到1920 * 1080画布下的状态
                    x = result[0] * max(self.rgb888p_size) / max(self.model_input_size)
                    y = result[1] * max(self.rgb888p_size) / max(self.model_input_size)
                    w = result[2] * max(self.rgb888p_size) / max(self.model_input_size)
                    h = result[3] * max(self.rgb888p_size) / max(self.model_input_size)
                    det_res.append([x, y, w, h, list(result[4:]).index(max_score), max_score])
            # 这里还需要一个NMS处理,因为,套框问题比较严重,但是我这里暂时没空写这个东西了,于是简单的按照,每个类别只检测一个物体,按这个逻辑来写
            # 后续可能会把NMS也写在这里,现在暂时不写了
            det_res.sort(key=lambda x:x[-1], reverse=True)
            det_res_single = []
            added_class = []
            for result in det_res:
                if not result[-2] in added_class:
                    added_class.append(result[-2])
                    det_res_single.append(result)
        return det_res_single

    # 绘制检测结果到画面上
    def draw_result(self, pl, dets):
        with ScopedTiming("display_draw", self.debug_mode > 0):
            if dets:
                pl.osd_img.clear()  # 清除OSD图像
                for det in dets:
                    # 将检测框的坐标转换为显示分辨率下的坐标
                    x, y, w, h = map(lambda x: int(round(x, 0)), det[:4])
                    x = x * self.display_size[0] // self.rgb888p_size[0]
                    y = y * self.display_size[1] // self.rgb888p_size[1]
                    w = w * self.display_size[0] // self.rgb888p_size[0]
                    h = h * self.display_size[1] // self.rgb888p_size[1]
                    pl.osd_img.draw_rectangle(x - w//2, y - h // 2, w, h, color=(255, 0, 255, 0), thickness=2)  # 绘制矩形框
                    pl.osd_img.draw_string_advanced(x - w//2, y - h // 2, 80, "{} {}".format(self.class_id[det[-2]], round(det[-1], 2)), color=(255, 0, 255, 0)) # 画标签和置信度
            else:
                pl.osd_img.clear()

    # 对检测结果进行一个大致的检测
    def draw_number_sort(self,dets , sensor_number):
        for det in dets:
                conf = det[5]
                # 过滤置信度 < 0.80 的目标(核心过滤逻辑)
                if conf < 0.80:
                    continue  # 跳过低置信度目
        res_sorted = sorted(dets, key=lambda x: x[0])
        send_data = []
        for obj in res_sorted:
            class_idnum = obj[4]
            cls_name = self.class_id[int(class_idnum)]
            if cls_name in class_1_map:
                send_data.append(cls_name)
                send_data = send_data[:5]
                MA = bytearray([sensor_number, 0x00, send_data[0], send_data[1], send_data[2], send_data[3], send_data[4], 0x6B])

        return send_data

    # 获取padding参数
    def get_padding_param(self):
        dst_w = self.model_input_size[0]  # 模型输入宽度
        dst_h = self.model_input_size[1]  # 模型输入高度
        ratio_w = dst_w / self.rgb888p_size[0]  # 宽度缩放比例
        ratio_h = dst_h / self.rgb888p_size[1]  # 高度缩放比例
        ratio = min(ratio_w, ratio_h)  # 取较小的缩放比例
        new_w = int(ratio * self.rgb888p_size[0])  # 新宽度
        new_h = int(ratio * self.rgb888p_size[1])  # 新高度
        dw = (dst_w - new_w) / 2  # 宽度差
        dh = (dst_h - new_h) / 2  # 高度差
        top = int(round(0))
        bottom = int(round(dh * 2 + 0.1))
        left = int(round(0))
        right = int(round(dw * 2 - 0.1))
        return top, bottom, left, right

    def tell_you_the_thing(self, dets, sensor_number):
        if not dets:
            return None
        # ===== 选置信度最高的目标 =====
        det = max(dets, key=lambda x: x[5])
        x, y, w, h, class_id, score = det
        # ===== 过滤置信度 =====
        if score < 0.85:
            return None
        # ===== 获取类别名 =====
        cls_name = self.class_id[int(class_id)]
        # ===== 防抖 =====
        if cls_name is self.last_result:
            self.stable_count += 1
        else:
            self.stable_count = 0

        self.last_result = cls_name

        if self.stable_count < 2:
            return None# 不稳定,不发送

        # ===== 映射 =====
        if cls_name not in class_1_map:
            return None
        cls_code = class_1_map[cls_name]
        # ===== 串口发送 =====
        MA = bytearray([sensor_number,0x00, 0x00, 0x00, 0x00, 0x00,cls_code,0x6D])
        uart.write(MA)
        return cls_code

    def sensor_control(self,sensor):
        img_ai = sensor.snapshot(chn=2)
        img_display = sensor.snapshot(chn=0)
        return img_display, img_ai

if __name__ == "__main__":
    sensor0 = Sensor(id=0)
    sensor0.reset()
    #显示通道给lcd
    sensor0.set_framesize(width=800, height=480, chn=CAM_CHN_ID_0)
    sensor0.set_pixformat(Sensor.RGB565, chn=CAM_CHN_ID_0)
#    #AI通道给yolo
    sensor0.set_framesize(width=OUT_RGB888P_WIDTH, height=OUT_RGB888P_HEIGHT, chn=CAM_CHN_ID_2)
    sensor0.set_pixformat(Sensor.RGBP888, chn=CAM_CHN_ID_2)
    sensor0.set_hmirror(True)
    sensor0.set_vflip(True)

    sensor1 = Sensor(id=1)
    sensor1.reset()
    #显示通道给lcd
    sensor1.set_framesize(width=800, height=480, chn=CAM_CHN_ID_0)
    sensor1.set_pixformat(Sensor.RGB565, chn=CAM_CHN_ID_0)
    #AI通道给yolo
    sensor1.set_framesize(width=OUT_RGB888P_WIDTH, height=OUT_RGB888P_HEIGHT, chn=CAM_CHN_ID_2)
    sensor1.set_pixformat(Sensor.RGBP888, chn=CAM_CHN_ID_2)
    sensor1.set_hmirror(True)
    sensor1.set_vflip(True)

    sensor2 = Sensor(id=2)
    sensor2.reset()
    #显示通道给lcd
    sensor2.set_framesize(width=800, height=480, chn=CAM_CHN_ID_0)
    sensor2.set_pixformat(Sensor.RGB565, chn=CAM_CHN_ID_0)
    #AI通道给yolo
    sensor2.set_framesize(width=OUT_RGB888P_WIDTH, height=OUT_RGB888P_HEIGHT, chn=CAM_CHN_ID_2)
    sensor2.set_pixformat(Sensor.RGBP888, chn=CAM_CHN_ID_2)
    sensor2.set_hmirror(True)
    sensor2.set_vflip(True)

    Display.init(Display.ST7701, width=800, height=480, to_ide= True)
    clock = time.clock()
    # 初始化媒体管理器
    MediaManager.init()
    # 启动传感器
    sensor0.run()
    sensor1.run()
    sensor2.run()
    # 显示模式,默认"hdmi",可以选择"hdmi"和"lcd"
    display_mode="lcd"
    # k230保持不变,k230d可调整为[640,360]
    rgb888p_size = [1920, 1080]

    if display_mode=="hdmi":
        display_size=[1920,1080]
    else:
        display_size=[800,480]
    # 设置模型路径和其他参数
    kmodel_path = "/sdcard/best.kmodel"
    # 其它参数
    confidence_threshold = 0.8
    nms_threshold = 0.2
    anchor_len = None
    det_dim = 4
    anchors_path = None
    anchors = None
    anchors = None
    # 初始化自定义物体检测实例
    yolo_det = YOLOv11App(kmodel_path, model_input_size=[320, 320], anchors=anchors, confidence_threshold=confidence_threshold, nms_threshold=nms_threshold, rgb888p_size=rgb888p_size, display_size=display_size, debug_mode=0)
    yolo_det.config_preprocess()  # 配置预处理

    mode = 0
    current_sensor = sensor0

    try:
        while True:
            os.exitpoint()                      # 检查是否有退出信号
            #*****************************************************************************************************
            clock.tick()
            time.sleep_ms(10)
            # 显示实时帧数
            # 串口数据接收处理sensors
            data = uart.read()
            #print(list_flag)
#               if list_flag is None:
#                    continue
            print(f"接收到数据: {data}")

    # ===== 串口读取 =====
            if data and len(data) == 8:
                uart_flag = data
                #根据不同的要求、场景来选择不同的摄像头
                if uart_flag == b'\x01\x01\x00\x00\x00\x00\x00\x00':
                    mode = 1
                    current_sensor = sensor0

                elif uart_flag == b'\x01\x02\x00\x00\x00\x00\x00\x00':
                    mode = 2
                    current_sensor = sensor0

                elif uart_flag == b'\x02\x00\x00\x00\x00\x00\x00\x00':
                    mode = 3
                    current_sensor = sensor1

                elif uart_flag == b'\x03\x01\x00\x00\x00\x00\x00\x00':
                    mode = 4
                    current_sensor = sensor2
                elif uart_flag == b'\x03\x02\x00\x00\x00\x00\x00\x00':
                    mode = 5
                    current_sensor = sensor2

            # ===== 统一处理 =====
            img_display, img_ai = yolo_det.sensor_control(current_sensor)
            dets = yolo_det.run(img_ai)

            if mode == 1:
                yolo_det.tell_you_the_thing(dets, 0x01)

            elif mode == 2:
                yolo_det.draw_number_sort(dets,0x01)

            elif mode == 3:
                yolo_det.tell_you_the_thing(dets, 0x02)

            elif mode == 4:
                yolo_det.tell_you_the_thing(dets, 0x03)

            elif mode == 5:
                yolo_det.draw_number_sort(dets,0x03)

            yolo_det.draw_result(img_display, dets)
            Display.show_image(img_display, x=int((DISPLAY_WIDTH - picture_width) / 2), y=int((DISPLAY_HEIGHT - picture_height) / 2))


            list_flag_1 = uart.read()
            if list_flag_1 == b'\x06\x00\x00\x00\x00\x00\x00\x00':
                break
            #************************************************************************************************************
    except Exception as e:
        print(e)                  # 打印异常信息
    finally:
        yolo_det.deinit()                       # 反初始化

复现步骤


硬件板卡


庐山派K230

软件版本


CanMV_K230_LCKFB_micropython_v1.5-legacy-0-g413737f_nncase_v2.9.0

1 Answers

庐山派的内存应该是够得,你这个也不是特别复杂,可以把一些功能模块注掉,看看注掉哪一个模块的时候就可以正常运行,说明就是这个模块有问题