问题描述
import time, os
from machine import Pin, PWM, FPIOA, TOUCH
from media.sensor import *
from media.display import *
from media.media import MediaManager
import image
DISPLAY_WIDTH, DISPLAY_HEIGHT = 800, 480
# 舵机与流量检测相关
fpioa = FPIOA()
fpioa.set_function(42, FPIOA.PWM0)
S1 = PWM(0, 50, 0, enable=True)
def set_servo_angle(angle):
min_pulse = 500
max_pulse = 2500
angle = max(0.0, min(270.0, angle))
pulse_width = min_pulse + (angle / 270.0) * (max_pulse - min_pulse)
duty_cycle = (pulse_width / 20000.0) * 100.0
S1.duty(duty_cycle)
return angle, pulse_width, duty_cycle
ROI_X, ROI_Y, ROI_WIDTH, ROI_HEIGHT = 380, 0, 40, 380
ZERO_FLOW_Y = ROI_Y + ROI_HEIGHT
MAX_FLOW_Y = ROI_Y
SCALE_START, SCALE_END = 0.16, 1.6 # 流量范围
EMA_ALPHA = 0.6
def calculate_flow_rate(y_position):
if y_position < MAX_FLOW_Y:
y_position = MAX_FLOW_Y
elif y_position > ZERO_FLOW_Y:
y_position = ZERO_FLOW_Y
position_ratio = (ZERO_FLOW_Y - y_position) / (ZERO_FLOW_Y - MAX_FLOW_Y)
flow_rate = SCALE_START + position_ratio * (SCALE_END - SCALE_START)
return flow_rate
def detect_flow(img):
roi_img = img.copy(roi=(ROI_X, ROI_Y, ROI_WIDTH, ROI_HEIGHT))
try:
roi_img = roi_img.gaussian(3)
except: pass
circles = roi_img.find_circles(
threshold=800,
x_margin=8,
y_margin=8,
r_margin=6,
r_min=1,
r_max=3
)
detected_circles = []
if circles:
for circle in circles:
x = circle.x() + ROI_X
y = circle.y() + ROI_Y
detected_circles.append((x, y, circle.r()))
return detected_circles
class PIDController:
def __init__(self, kp=60.0, ki=8.0, kd=2.0, dt=0.2, output_limit=20.0, integral_limit=100.0):
self.kp = kp; self.ki = ki; self.kd = kd; self.dt = dt
self.output_limit = output_limit
self.integral_limit = integral_limit
self._integral = 0.0
self._prev_error = 0.0
def reset(self): self._integral = 0.0; self._prev_error = 0.0
def update(self, setpoint, measurement):
error = setpoint - measurement
self._integral += error * self.dt
self._integral = min(max(self._integral, -self.integral_limit), self.integral_limit)
derivative = (error - self._prev_error) / self.dt
out = self.kp * error + self.ki * self._integral + self.kd * derivative
out = min(max(out, -self.output_limit), self.output_limit)
self._prev_error = error
return out
def load_flow_schedule(filename='/sdcard/data.txt'):
arr = []
try:
with open(filename, 'r') as f:
for line in f:
vals = line.strip().split(',')
if len(vals) == 2:
try:
flow = float(vals[0].replace('ml','').replace('L','').strip())
dur = float(vals[1].replace('s','').replace('min','').strip())
arr.append([flow, dur])
except: pass
except: pass
return arr
def save_flow_schedule(data_array, filename='/sdcard/data.txt'):
try:
with open(filename, 'w') as f:
for row in data_array:
f.write("{},{}\n".format(row[0], row[1]))
except: pass
# --- 简单数字键盘UI ----
keys = [
['1', '2', '3', '←'],
['4', '5', '6', 'Clear'],
['7', '8', '9', 'OK'],
['0', '', '', '']
]
KEY_W = 180
KEY_H = 90
KEY_START_X = 70
KEY_START_Y = 60
def draw_keyboard(img, input_text, step, data_array):
img.clear()
tip = "输入流量:" if step == 0 else "输入时间(秒):"
img.draw_string(60, 20, tip, scale=2)
img.draw_string(350, 20, input_text, scale=3)
for row in range(len(keys)):
for col in range(len(keys[row])):
key = keys[row][col]
key_x = KEY_START_X + col * KEY_W
key_y = KEY_START_Y + row * KEY_H
if key:
img.draw_rectangle(key_x, key_y, KEY_W - 10, KEY_H - 10, thickness=2, fill=True)
kw = KEY_W - 10
kh = KEY_H - 10
text_x = key_x + kw//2 - 20
text_y = key_y + kh//2 - 20
img.draw_string(text_x, text_y, key, scale=3)
# 显示保存的数据
img.draw_string(60, 450, "已保存数据:", scale=2)
show_data = data_array[-6:]
col_x = [60, 320, 560]
row_y = [410, 440]
for idx, row in enumerate(show_data):
s = "{}.流量:{} 时间:{}".format(len(data_array)-len(show_data)+idx+1, row[0], row[1])
col = idx % 3
row_idx = idx // 3
img.draw_string(col_x[col], row_y[row_idx], s, scale=2)
# 增加返回主界面按钮
img.draw_rectangle(700, 20, 80, 60, thickness=2, fill=True)
img.draw_string(710,30,"返回",scale=2)
def get_keyboard_touch(x, y):
# 按钮区域
for row in range(len(keys)):
for col in range(len(keys[row])):
key = keys[row][col]
key_x = KEY_START_X + col * KEY_W
key_y = KEY_START_Y + row * KEY_H
if key and key_x < x < key_x + KEY_W and key_y < y < key_y + KEY_H:
return key
# 判断返回主界面按钮
if 700 < x < 780 and 20 < y < 80:
return 'Back'
return None
def draw_main_gui(img, step, elapsed, total_steps, flow_val):
img.clear()
img.draw_rectangle(50, 40, 700, 380, thickness=4)
img.draw_string(320, 70, "Step {}".format(step+1), scale=4)
img.draw_string(290, 170, "流量: {:.2f}L/min".format(flow_val), scale=4)
img.draw_string(270, 270, "已累计时间: {:.1f}s".format(elapsed), scale=3)
img.draw_string(320, 350, "Set", scale=4) # Set按钮区域
def main():
MediaManager.init()
Display.init(Display.ST7701, width=DISPLAY_WIDTH, height=DISPLAY_HEIGHT, to_ide=True)
tp = TOUCH(0)
sensor0 = Sensor(id=0)
sensor0.reset()
sensor0.set_framesize(width=DISPLAY_WIDTH, height=DISPLAY_HEIGHT)
sensor0.set_pixformat(Sensor.RGB565)
sensor0.set_vflip(True)
sensor0.set_hmirror(True)
sensor0.run()
data_array = load_flow_schedule()
step = 0
step_start_ms = time.ticks_ms()
input_text = ""
input_mode = False
input_step = 0
cur_row = [None, None]
last_touch = False
current_angle = 135.0
set_servo_angle(current_angle)
pid = PIDController()
filtered_flow = 0.0
while True:
img = image.Image(DISPLAY_WIDTH, DISPLAY_HEIGHT, image.RGB888)
now_ms = time.ticks_ms()
if not input_mode:
raw_elapsed = time.ticks_diff(now_ms, step_start_ms) / 1000
snap = sensor0.snapshot(chn=0)
circles = detect_flow(snap)
measured_flow = 0.0
if circles:
x0, y0, r0 = circles[0]
measured_flow = calculate_flow_rate(y0)
filtered_flow = EMA_ALPHA * measured_flow + (1 - EMA_ALPHA) * filtered_flow
target_flow = float(data_array[step][0]) if step < len(data_array) else 0.2
if circles and step < len(data_array):
error = target_flow - filtered_flow
pid_out = pid.update(target_flow, filtered_flow)
delta = -1 * pid_out
new_angle = current_angle + delta
new_angle = max(0.0, min(270.0, new_angle))
if abs(new_angle - current_angle) >= 0.1:
current_angle = new_angle
set_servo_angle(current_angle)
if step < len(data_array) and len(data_array) > 0:
try:
step_time = float(data_array[step][1]) if data_array[step][1] else 10
except:
step_time = 10
if raw_elapsed >= step_time:
step += 1
if step >= len(data_array):
step = 0
step_start_ms = time.ticks_ms()
draw_main_gui(img, step, raw_elapsed, len(data_array), filtered_flow)
else:
draw_keyboard(img, input_text, input_step, data_array)
Display.show_image(img)
try:
import IDE
IDE.display(img)
except: pass
p = tp.read()
if p != () and not last_touch:
last_touch = True
for i in range(len(p)):
x = p[i].x
y = p[i].y
if not input_mode:
if 320 < x < 480 and 350 < y < 420: # Set按钮
input_mode = True
input_text = ""
input_step = 0
cur_row = [None, None]
else:
key = get_keyboard_touch(x, y)
if key:
if key == '←':
input_text = input_text[:-1]
elif key == 'Clear':
data_array = []
save_flow_schedule(data_array)
elif key == 'Back':
input_mode = False
step = 0
step_start_ms = time.ticks_ms()
elif key == 'OK':
if input_text == "":
continue
if input_step == 0:
cur_row[0] = input_text
input_text = ""
input_step = 1
elif input_step == 1:
cur_row[1] = input_text
input_text = ""
input_step = 0
data_array.append([cur_row[0], cur_row[1]])
save_flow_schedule(data_array)
cur_row = [None, None]
elif len(key) == 1 and key.isdigit() and len(input_text) < 6:
input_text += key
if p == ():
last_touch = False
time.sleep_ms(100)
if __name__=='__main__':
main()
两个脚本分开使用没有问题 整合后出现问题
硬件板卡
k230