问题描述
无论是有线或是无线方式,获取IP联网部分好用,但是建立TCP通讯时,重新运行代码就会报错,是因为上一次的代码中操作模块后建立的连接还存在吗,怎么彻底清除保证最新一次连接成功,当前只能彻底断电后才能再次好使
报错种类1:socket_socket() -> errno 12
报错种类2:run connect failed.
[ERROR] 初始化连接时发生未知错误: [Errno 12] ENOMEM
import network
import socket
import struct
import time
import os
import sys
import json
from machine import RTC
from ubinascii import hexlify # 将字节串(bytes)转换为十六进制字符串(hex string)
import network
import asyncio
## ---------------【Net基类参数】----------------------------
class NetBase:
def __init__(self, wireless=True):
"""
初始化 NetBase 对象
"""
self.wireless = wireless # 是否为无线网络
self.mac = '' # 连接成功后的 MAC 地址
self.ip = '' # 连接成功后的 IP 地址
self.netmask = '' # 连接成功后的子网掩码
self.gateway = '' # 连接成功后的网关地址
self.dns = '' # 连接成功后的 DNS 服务器地址
self.net = None # 网络对象,有线无线都叫这个
# 激活 WLAN 模块,此功能不需要了,是常开
# self.net.active(True) # Network (rt-smart) is always active and cannot be disabled.
@property
def isconnected(self):
"""
获取连接状态
:return: 连接状态,成功返回 True,失败返回 False
"""
if self.net is None:
print("未初始化网络对象")
return False
# print(f"连接状态:{self.net.isconnected()}")
return self.net.isconnected()
def internet(self, test_address=('www.baidu.com', 80), timeout=3):
"""
尝试域名解析访问外部网址
:param test_address: 用于测试的服务器地址和端口,默认为百度服务器
:param timeout: 连接超时时间(秒)
:return: 可以访问返回 True,不能访问返回 False
"""
try:
addr_info = socket.getaddrinfo(*test_address) # 解包+解析
# [(2, 1, 0, '', ('110.242.69.21', 80))]
ip = addr_info[0][4][0]
port = addr_info[0][4][1]
# 创建套接字
s = socket.socket()
# 设置超时时间
s.settimeout(timeout)
# 尝试连接测试服务器
s.connect((ip, port))
s.close()
print(f'测试连接正常: {ip}:{port}')
return True
except Exception as e:
print(f'未知错误: {e}')
return False
def get_config(self):
"""
获取当前网络配置信息
:return: 包含网络配置的字典
"""
self.mac = self.net.config('mac')
self.ip, self.netmask, self.gateway, self.dns = self.net.ifconfig()
link_type = 'WLAN/WIFI' if self.wireless else 'LAN/Ethernet' # 网络类型
config = {
'connected': self.isconnected,
'mac': self.mac,
'ip': self.ip,
'netmask': self.netmask,
'gateway': self.gateway,
'dns': self.dns,
'link_type': link_type,
}
if self.wireless:
config.update({
'ssid': self.ssid,
})
return config
## ---------------【基础-WIFI】----------------------------
class WIFI(NetBase):
def __init__(self):
# 显式调用父类初始化
super().__init__()
"""
初始化 WIFI 对象
:param ssid: 无线网络名称
:param password: 无线网络密码
:param timeout: 连接超时时间(秒)
"""
self.net = network.WLAN(network.STA_IF) # 创建 WLAN 对象并设置为 STA 模式
self.ssid = '' # 无线网络名称
self.password = '' # 无线网络密码
self.timeout = 0 # 连接超时时间
def __str__(self):
"""
返回 WIFI 连接状态的字符串表示
:return: 包含 WIFI 连接状态的字符串
"""
return f"WIFI(ssid={self.ssid}, ip={self.ip})"
def init(self, ssid='YJOT-2.4G', password='yijia0516', timeout=10):
"""
初始化并连接 WIFI
:return: 连接结果,成功返回 True,失败返回 False
"""
self.ssid = ssid # 无线网络名称
self.password = password # 无线网络密码
self.timeout = timeout # 连接超时时间
# 检查是否已连接
if not self.net.isconnected():
# 扫描周围网络名称是否存在目标WIFI
found = False
print(f'扫描网络中...{self.ssid}')
networks = self.net.scan() # 使用正确的net对象进行扫描
for item in networks: #item -> rt_wlan_info字典,包含bytes信息
ssid = item.ssid # 不同平台的返回格式可能不同,使用索引更兼容?
if self.ssid.encode('utf-8') == ssid:
found = True
print(f'找到目标网络: {self.ssid}')
break
else:
print(f'目标网络不匹配: {self.ssid}')
if not found:
print(f'未找到目标网络: {self.ssid}')
return False
# 连接网络
self.net.connect(self.ssid, self.password)
# 等待连接完成或超时
start_time = time.time()
while not self.net.isconnected() and (time.time() - start_time) < self.timeout:
time.sleep(0.1)
# 检查连接状态
if self.net.isconnected():
print('连接成功')
return True
else:
print('连接超时')
return False
else:
print('已连接')
self.ip = self.net.ifconfig()[0]
print(f'IP地址: {self.ip}')
return True
# 调用示例
net = WIFI()
net.init("YJOT","yijia0516")
print(net.get_config())
net.internet()
## ---------------【基础-以太网】----------------------------
class Ethernet(NetBase):
def __init__(self):
"""
初始化以太网对象
"""
# ✅ 显式调用父类初始化
super().__init__(wireless=False)
self.net = network.LAN() # 创建 LAN 对象并设置为 LAN 模式
self.cus_ipconfig = None # 自定义IP配置 (ip, netmask, gateway, dns)
def init(self, ip=None, netmask=None, gateway=None, dns=None):
"""
初始化以太网
:param ip: IP 地址
:param netmask: 子网掩码
:param gateway: 网关
:param dns: DNS 服务器
:return: 初始化结果,成功返回 True,失败返回 False
"""
# 集体判断:all() 函数会判断 列表里的所有元素是否都为真,如果都为真,则返回True,否则返回False
self.cus_ipconfig = (ip, netmask, gateway, dns) if all([ip, netmask, gateway, dns]) else None
if self.cus_ipconfig: # 指定就配置
self.net.ifconfig(self.cus_ipconfig)
self.get_config()
if ip != self.ip:
print(f"以太网配置失败,当前IP地址为{self.ip}")
return False
if self.isconnected:
self.get_config()
if self.ip == "0.0.0.0" and self.netmask == "0.0.0.0" and self.gateway == "0.0.0.0" and self.dns == "0.0.0.0":
print(f"以太网配置失败,当前IP地址为{self.ip},请检查网线是否插入")
return False
else:
return True
else:
print(f"以太网配置失败,当前IP地址为{self.ip},请检查网线是否插入")
return False
# # # 调用示例
# net = Ethernet()
# net.init()
# print(net.get_config())
# net.internet()
# print("IP:",net.ip)
## ---------------【LAN统一管理类】----------------------------
class NetworkManager(NetBase):
def __init__(self, wireless=True, ssid=None, password=None, timeout=10,
ip=None, netmask=None, gateway=None, dns=None):
"""
初始化网络管理器(统一 WiFi / LAN)
:param wireless: 是否使用无线网络
:param ssid: WiFi 名称(无线模式下必须)
:param password: WiFi 密码(无线模式下必须)
:param timeout: WiFi 连接超时
:param ip: LAN 静态 IP
:param netmask: LAN 子网掩码
:param gateway: LAN 网关
:param dns: LAN DNS
"""
super().__init__(wireless=wireless)
# 有线 / 无线对象初始化
if wireless:
if not ssid or not password:
raise ValueError("无线模式下必须提供 ssid 和 password")
self.net = WIFI()
self.ssid = ssid
self.password = password
self.timeout = timeout
else:
self.net = Ethernet()
self.cus_ipconfig = (ip, netmask, gateway, dns) if all([ip, netmask, gateway, dns]) else None
def init(self):
"""
初始化并连接网络(WiFi / LAN)
:return: 连接成功返回 True,失败返回 False
"""
if self.wireless:
return self.net.init(ssid=self.ssid, password=self.password, timeout=self.timeout)
else:
if self.cus_ipconfig:
return self.net.init(*self.cus_ipconfig)
else:
return self.net.init()
@property
def isconnected(self):
return self.net.isconnected
def internet(self, test_address=('www.baidu.com', 80), timeout=3):
return self.net.internet(test_address, timeout)
def get_config(self):
config = self.net.get_config()
return config
# # 调用示例
# # WiFi模式
# network = NetworkManager(wireless=True, ssid='YJOT-2.4G', password='yijia0516', timeout=10)
# # 以太网模式
# # network = NetworkManager(wireless=False, ip='192.168.58.100', netmask='255.255.255.0', gateway='192.168.58.1', dns='192.168.58.1')
# # 初始化
# network.init()
# # DNS解析
# network.internet()
# # 配置查看
# print(f"配置:{network.get_config()}")
## ---------------【TCP服务端 - 有问题接不到】----------------------------
class TcpServer:
def __init__(self, host='0.0.0.0', port=8081):
self.host = host
self.port = port
self.sock = None
def handle_client(self, client_sock, client_addr):
"""处理客户端连接"""
try:
# 接收4字节文件大小头部
raw_header = self.recvall(client_sock, 4)
if not raw_header:
print("[WARN] 没收到长度信息")
return
filesize = struct.unpack("!I", raw_header)[0]
print(f"[INFO] 正在接收 {filesize} 字节数据")
# 接收实际数据
data = self.recvall(client_sock, filesize)
if not data:
print("[ERROR] 未收到数据")
return
# 打印接收到的完整数据
print(f"[INFO] 接收到的数据:{data}")
print(f"[INFO] 数据的总长度:{len(data)} 字节")
except Exception as e:
print(f"[ERROR] 客户端 {client_addr} 处理失败: {e}")
finally:
try:
client_sock.close()
print(f"[INFO] 已关闭连接 {client_addr}")
except Exception as e:
print(f"[ERROR] 关闭连接失败: {e}")
def recvall(self, sock, length):
"""辅助函数:确保接收固定长度数据"""
data = b''
try:
while len(data) < length:
chunk = sock.recv(length - len(data))
if not chunk:
print("[WARN] 未收到更多数据,连接可能关闭")
return None
data += chunk
except socket.error as e:
print(f"[ERROR] 接收数据出错: {e}")
return None
return data
# # 调用示例
# net = NetworkManager(wireless=False) # Ethernet()
# net.init()
# print(net.get_config())
# net.internet()
# print("IP:",net.ip)
# # 启动服务器
# if __name__ == '__main__':
# server = TcpServer(host=net.ip,port=8081)
# # 启动服务器监听
# server.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
# try:
# server.sock.bind((server.host, server.port))
# server.sock.listen(5)
# print(f"[INFO] TCP 服务器已启动,监听 {server.host}:{server.port}")
# except OSError as e:
# print(f"[ERROR] 无法绑定端口 {server.port}: {e}")
# # exit(1)
# try:
# while True:
# # 接收一个客户端连接
# client_sock, client_addr = server.sock.accept()
# print(f"[INFO] 接收到客户端连接 {client_addr}")
# # 处理客户端连接(阻塞处理)
# server.handle_client(client_sock, client_addr)
# except Exception as e:
# print(f"[ERROR] 服务器出现异常: {e}")
# finally:
# try:
# server.sock.close()
# print("[INFO] 服务器关闭")
# except Exception as e:
# print(f"[ERROR] 关闭服务器失败: {e}")
## ---------------【TCP客户端】----------------------------
class TcpClient:
def __init__(self, host="101.35.21.193", port=13719, keep_alive=True, timeout=5):
"""
初始化 TCP 客户端
:param host: 服务器的地址
:param port: 服务器的端口
:param keep_alive: 是否保持长连接,默认为 True
:param timeout: 连接超时时间,默认为 5 秒
"""
self.host = host
self.port = port
self.keep_alive = keep_alive
self.timeout = timeout
self.sock = None
self.buff = bytearray()
def _create(self):
"""创建并配置 socket"""
# 若旧socket还存在则关闭
if self.sock:
print(f"[INFO] 关闭旧连接 {self.host}:{self.port}")
self.close()
s = socket.socket()
s.settimeout(self.timeout)
return s
def init(self):
"""初始化连接"""
self.close() # 确保旧连接彻底关闭
# gc.collect() # 主动清理旧 socket 内存
try:
self.sock = self._create()
print(f"[INFO] 尝试连接到 {self.host}:{self.port}")
self.sock.connect((self.host, self.port))
if self.keep_alive:
self.sock.settimeout(None) # 保持长连接,设置无超时
print(f"[INFO] 成功连接到 {self.host}:{self.port}")
return True
except Exception as e:
print(f"[ERROR] 初始化连接时发生未知错误: {e}")
self.sock = None
return False
def recv(self, size=1024):
"""接收数据"""
if not self.sock:
# print("[WARN] 无连接,无法接收")
return b""
try:
data = self.sock.recv(size)
return data
except Exception as e:
print(f"[ERROR] 接收失败: {e}")
self.close()
return b""
def send_data(self, data):
"""
发送字节数据到服务器
:param data: 发送的字节数据
"""
if not self.sock:
print("[WARN] 无连接,尝试初始化连接")
return False
# self.init() # 如果连接未建立,先建立连接
if self.sock:
try:
filesize = len(data)
header = struct.pack("!I", filesize) # 包含文件大小的头部
self.sock.sendall(header) # 发送文件头(文件大小)
self.sock.sendall(data) # 发送数据
print("[INFO] 数据发送完成")
except Exception as e:
print(f"[ERROR] 发送数据失败: {e}")
else:
print("[ERROR] 连接不可用,无法发送数据")
def send_image(self, image_path):
"""
发送图片数据
:param image_path: 图片的文件路径
"""
try:
with open(image_path, 'rb') as f:
img_data = f.read()
self.send_data(img_data)
print(f"[INFO] 图片 {image_path} 发送完成")
except Exception as e:
print(f"[ERROR] 读取图片失败: {e}")
def close(self):
"""安全关闭连接"""
if self.sock:
try:
self.sock.close()
print("[INFO] 连接已关闭")
except Exception as e:
print(f"[WARN] 关闭连接异常: {e}")
finally:
self.sock = None
gc.collect() # 强制释放底层socket内存
else:
print("[ERROR] 连接未建立,无法关闭")
# 调用示例
# net = WIFI()
# net.init("YJOT","yijia0516")
# # net = NetworkManager(wireless=False)
# # net.init()
# print(net.get_config())
# net.internet()
# print("IP:",net.ip)
if __name__ == "__main__":
client = TcpClient(host="192.168.58.108", port=8080, keep_alive=True)
client.init()
# 发送图片数据
# client.send_image("/data/Img/19700101_000834.jpg")
# 发送字节数据
client.send_data(b"Hello, server!")
# 关闭连接
# client.close()
while True:
data = client.recv()
if data:
print(f"[INFO] 收到服务器数据(type={type(data)}): {data}")
time.sleep(1)
硬件板卡
庐山派
硬件板卡
庐山派
硬件板卡
庐山派
硬件板卡
庐山派
软件版本
CanMV v1.4-2-g873d625(based on Micropython e00a144) on 2025-09-12; k230_canmv_lckfb with K230