socket通讯

Viewed 15

问题描述


无论是有线或是无线方式,获取IP联网部分好用,但是建立TCP通讯时,重新运行代码就会报错,是因为上一次的代码中操作模块后建立的连接还存在吗,怎么彻底清除保证最新一次连接成功,当前只能彻底断电后才能再次好使

报错种类1:socket_socket() -> errno 12
报错种类2:run connect failed.

[ERROR] 初始化连接时发生未知错误: [Errno 12] ENOMEM

import network
import socket
import struct
import time
import os
import sys
import json
from machine import RTC
from ubinascii import hexlify   # 将字节串(bytes)转换为十六进制字符串(hex string)
import network
import asyncio

## ---------------【Net基类参数】----------------------------
class NetBase:
    def __init__(self, wireless=True):
        """
        初始化 NetBase 对象
        """
        self.wireless = wireless    # 是否为无线网络
        self.mac = ''               # 连接成功后的 MAC 地址
        self.ip = ''                # 连接成功后的 IP 地址
        self.netmask = ''           # 连接成功后的子网掩码
        self.gateway = ''           # 连接成功后的网关地址
        self.dns = ''               # 连接成功后的 DNS 服务器地址
        self.net = None             # 网络对象,有线无线都叫这个
        # 激活 WLAN 模块,此功能不需要了,是常开
        # self.net.active(True)     # Network (rt-smart) is always active and cannot be disabled.
    
    @property
    def isconnected(self):
        """
        获取连接状态
        :return: 连接状态,成功返回 True,失败返回 False
        """
        if self.net is None:
            print("未初始化网络对象")
            return False
        # print(f"连接状态:{self.net.isconnected()}")
        return self.net.isconnected()

    def internet(self, test_address=('www.baidu.com', 80), timeout=3):
        """
        尝试域名解析访问外部网址
        :param test_address: 用于测试的服务器地址和端口,默认为百度服务器
        :param timeout: 连接超时时间(秒)
        :return: 可以访问返回 True,不能访问返回 False
        """
        try:
            addr_info = socket.getaddrinfo(*test_address)    # 解包+解析
            # [(2, 1, 0, '', ('110.242.69.21', 80))]
            ip = addr_info[0][4][0]
            port = addr_info[0][4][1]

            # 创建套接字
            s = socket.socket()
            # 设置超时时间
            s.settimeout(timeout)
            
            # 尝试连接测试服务器
            s.connect((ip, port))
            s.close()
            print(f'测试连接正常: {ip}:{port}')
            return True
        except Exception as e:
            print(f'未知错误: {e}')
            return False
    
    def get_config(self):
        """
        获取当前网络配置信息
        :return: 包含网络配置的字典
        """
        self.mac = self.net.config('mac')
        self.ip, self.netmask, self.gateway, self.dns = self.net.ifconfig()
        link_type = 'WLAN/WIFI' if self.wireless else 'LAN/Ethernet'  # 网络类型
        config = {
            'connected': self.isconnected,
            'mac': self.mac,
            'ip': self.ip,
            'netmask': self.netmask,
            'gateway': self.gateway,
            'dns': self.dns,
            'link_type': link_type,
        }
        if self.wireless:
            config.update({
                'ssid': self.ssid,
            })
        return config

## ---------------【基础-WIFI】----------------------------
class WIFI(NetBase):
    def __init__(self):
        # 显式调用父类初始化
        super().__init__()
        """
        初始化 WIFI 对象
        :param ssid: 无线网络名称
        :param password: 无线网络密码
        :param timeout: 连接超时时间(秒)
        """
        self.net = network.WLAN(network.STA_IF)  # 创建 WLAN 对象并设置为 STA 模式
        self.ssid = ''              # 无线网络名称
        self.password = ''          # 无线网络密码
        self.timeout = 0            # 连接超时时间

    def __str__(self):
        """
        返回 WIFI 连接状态的字符串表示
        :return: 包含 WIFI 连接状态的字符串
        """
        return f"WIFI(ssid={self.ssid}, ip={self.ip})"

    def init(self, ssid='YJOT-2.4G', password='yijia0516', timeout=10):
        """
        初始化并连接 WIFI
        :return: 连接结果,成功返回 True,失败返回 False
        """
        self.ssid = ssid            # 无线网络名称
        self.password = password    # 无线网络密码
        self.timeout = timeout      # 连接超时时间

        # 检查是否已连接
        if not self.net.isconnected():
            # 扫描周围网络名称是否存在目标WIFI
            found = False
            print(f'扫描网络中...{self.ssid}')
            networks = self.net.scan()  # 使用正确的net对象进行扫描 
            for item in networks:   #item -> rt_wlan_info字典,包含bytes信息
                ssid = item.ssid  # 不同平台的返回格式可能不同,使用索引更兼容?
                if self.ssid.encode('utf-8') == ssid:
                    found = True
                    print(f'找到目标网络: {self.ssid}')
                    break            
                else:
                    print(f'目标网络不匹配: {self.ssid}')
            if not found:
                print(f'未找到目标网络: {self.ssid}')
                return False
            
            # 连接网络
            self.net.connect(self.ssid, self.password)
            # 等待连接完成或超时
            start_time = time.time()
            while not self.net.isconnected() and (time.time() - start_time) < self.timeout:
                time.sleep(0.1)
            # 检查连接状态
            if self.net.isconnected():
                print('连接成功')
                return True
            else:
                print('连接超时')
                return False
        else:
            print('已连接')
            self.ip = self.net.ifconfig()[0]
            print(f'IP地址: {self.ip}')
            return True

# 调用示例
net = WIFI()
net.init("YJOT","yijia0516")
print(net.get_config())
net.internet()

## ---------------【基础-以太网】----------------------------
class Ethernet(NetBase):
    def __init__(self):
        """
        初始化以太网对象
        """
        # ✅ 显式调用父类初始化
        super().__init__(wireless=False)
        self.net = network.LAN()            # 创建 LAN 对象并设置为 LAN 模式
        self.cus_ipconfig = None            # 自定义IP配置  (ip, netmask, gateway, dns)
    def init(self, ip=None, netmask=None, gateway=None, dns=None):
        """
        初始化以太网
        :param ip: IP 地址
        :param netmask: 子网掩码
        :param gateway: 网关
        :param dns: DNS 服务器
        :return: 初始化结果,成功返回 True,失败返回 False
        """
        # 集体判断:all() 函数会判断 列表里的所有元素是否都为真,如果都为真,则返回True,否则返回False
        self.cus_ipconfig = (ip, netmask, gateway, dns) if all([ip, netmask, gateway, dns]) else None

        
        if self.cus_ipconfig:   # 指定就配置
            self.net.ifconfig(self.cus_ipconfig)
            self.get_config()
            if ip != self.ip:
                print(f"以太网配置失败,当前IP地址为{self.ip}")
                return False
        
        if self.isconnected:
            self.get_config()
            if self.ip == "0.0.0.0" and self.netmask == "0.0.0.0" and self.gateway == "0.0.0.0" and self.dns == "0.0.0.0":
                print(f"以太网配置失败,当前IP地址为{self.ip},请检查网线是否插入")
                return False
            else:
                return True
        else:
            print(f"以太网配置失败,当前IP地址为{self.ip},请检查网线是否插入")
            return False

# # # 调用示例
# net = Ethernet()
# net.init()
# print(net.get_config())
# net.internet()
# print("IP:",net.ip)


## ---------------【LAN统一管理类】----------------------------
class NetworkManager(NetBase):
    def __init__(self, wireless=True, ssid=None, password=None, timeout=10,
                 ip=None, netmask=None, gateway=None, dns=None):
        """
        初始化网络管理器(统一 WiFi / LAN)
        :param wireless: 是否使用无线网络
        :param ssid: WiFi 名称(无线模式下必须)
        :param password: WiFi 密码(无线模式下必须)
        :param timeout: WiFi 连接超时
        :param ip: LAN 静态 IP
        :param netmask: LAN 子网掩码
        :param gateway: LAN 网关
        :param dns: LAN DNS
        """
        super().__init__(wireless=wireless)
        
        # 有线 / 无线对象初始化
        if wireless:
            if not ssid or not password:
                raise ValueError("无线模式下必须提供 ssid 和 password")
            self.net = WIFI()
            self.ssid = ssid
            self.password = password
            self.timeout = timeout
        else:
            self.net = Ethernet()
            self.cus_ipconfig = (ip, netmask, gateway, dns) if all([ip, netmask, gateway, dns]) else None

    def init(self):
        """
        初始化并连接网络(WiFi / LAN)
        :return: 连接成功返回 True,失败返回 False
        """
        if self.wireless:
            return self.net.init(ssid=self.ssid, password=self.password, timeout=self.timeout)
        else:
            if self.cus_ipconfig:
                return self.net.init(*self.cus_ipconfig)
            else:
                return self.net.init()

    @property
    def isconnected(self):
        return self.net.isconnected

    def internet(self, test_address=('www.baidu.com', 80), timeout=3):
        return self.net.internet(test_address, timeout)

    def get_config(self):
        config = self.net.get_config()
        return config

# # 调用示例
# # WiFi模式
# network = NetworkManager(wireless=True, ssid='YJOT-2.4G', password='yijia0516', timeout=10)
# # 以太网模式
# # network = NetworkManager(wireless=False, ip='192.168.58.100', netmask='255.255.255.0', gateway='192.168.58.1', dns='192.168.58.1')
# # 初始化
# network.init()
# # DNS解析
# network.internet()
# # 配置查看
# print(f"配置:{network.get_config()}")

## ---------------【TCP服务端 - 有问题接不到】----------------------------
class TcpServer:
    def __init__(self, host='0.0.0.0', port=8081):
        self.host = host
        self.port = port
        self.sock = None

    def handle_client(self, client_sock, client_addr):
        """处理客户端连接"""
        try:
            # 接收4字节文件大小头部
            raw_header = self.recvall(client_sock, 4)
            if not raw_header:
                print("[WARN] 没收到长度信息")
                return

            filesize = struct.unpack("!I", raw_header)[0]
            print(f"[INFO] 正在接收 {filesize} 字节数据")

            # 接收实际数据
            data = self.recvall(client_sock, filesize)
            if not data:
                print("[ERROR] 未收到数据")
                return

            # 打印接收到的完整数据
            print(f"[INFO] 接收到的数据:{data}")
            print(f"[INFO] 数据的总长度:{len(data)} 字节")

        except Exception as e:
            print(f"[ERROR] 客户端 {client_addr} 处理失败: {e}")
        finally:
            try:
                client_sock.close()
                print(f"[INFO] 已关闭连接 {client_addr}")
            except Exception as e:
                print(f"[ERROR] 关闭连接失败: {e}")

    def recvall(self, sock, length):
        """辅助函数:确保接收固定长度数据"""
        data = b''
        try:
            while len(data) < length:
                chunk = sock.recv(length - len(data))
                if not chunk:
                    print("[WARN] 未收到更多数据,连接可能关闭")
                    return None
                data += chunk
        except socket.error as e:
            print(f"[ERROR] 接收数据出错: {e}")
            return None
        return data

# # 调用示例
# net = NetworkManager(wireless=False)  # Ethernet()
# net.init()
# print(net.get_config())
# net.internet()
# print("IP:",net.ip)
# # 启动服务器
# if __name__ == '__main__':
#     server = TcpServer(host=net.ip,port=8081)

#     # 启动服务器监听
#     server.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
#     try:
#         server.sock.bind((server.host, server.port))
#         server.sock.listen(5)
#         print(f"[INFO] TCP 服务器已启动,监听 {server.host}:{server.port}")
#     except OSError as e:
#         print(f"[ERROR] 无法绑定端口 {server.port}: {e}")
#         # exit(1)

#     try:
#         while True:
#             # 接收一个客户端连接
#             client_sock, client_addr = server.sock.accept()
#             print(f"[INFO] 接收到客户端连接 {client_addr}")

#             # 处理客户端连接(阻塞处理)
#             server.handle_client(client_sock, client_addr)

#     except Exception as e:
#         print(f"[ERROR] 服务器出现异常: {e}")
#     finally:
#         try:
#             server.sock.close()
#             print("[INFO] 服务器关闭")
#         except Exception as e:
#             print(f"[ERROR] 关闭服务器失败: {e}")

## ---------------【TCP客户端】----------------------------
class TcpClient:
    def __init__(self, host="101.35.21.193", port=13719, keep_alive=True, timeout=5):
        """
        初始化 TCP 客户端
        :param host: 服务器的地址
        :param port: 服务器的端口
        :param keep_alive: 是否保持长连接,默认为 True
        :param timeout: 连接超时时间,默认为 5 秒
        """
        self.host = host
        self.port = port
        self.keep_alive = keep_alive
        self.timeout = timeout
        self.sock = None
        self.buff = bytearray()

    def _create(self):
        """创建并配置 socket"""
        # 若旧socket还存在则关闭
        if self.sock:
            print(f"[INFO] 关闭旧连接 {self.host}:{self.port}")
            self.close()

        s = socket.socket()
        s.settimeout(self.timeout)
        return s

    def init(self):
        """初始化连接"""
        self.close()  # 确保旧连接彻底关闭
        # gc.collect()  # 主动清理旧 socket 内存
        
        try:
            self.sock = self._create()            
            print(f"[INFO] 尝试连接到 {self.host}:{self.port}")
            self.sock.connect((self.host, self.port))
            if self.keep_alive:
                self.sock.settimeout(None)  # 保持长连接,设置无超时
            print(f"[INFO] 成功连接到 {self.host}:{self.port}")                
            return True
        except Exception as e:
            print(f"[ERROR] 初始化连接时发生未知错误: {e}")
            self.sock = None
            return False
    def recv(self, size=1024):
        """接收数据"""
        if not self.sock:
            # print("[WARN] 无连接,无法接收")
            return b""
        try:
            data = self.sock.recv(size)
            return data
        except Exception as e:
            print(f"[ERROR] 接收失败: {e}")
            self.close()
            return b""

    def send_data(self, data):
        """
        发送字节数据到服务器
        :param data: 发送的字节数据
        """
        if not self.sock:
            print("[WARN] 无连接,尝试初始化连接")
            return False
            # self.init()  # 如果连接未建立,先建立连接

        if self.sock:
            try:
                filesize = len(data)
                header = struct.pack("!I", filesize)  # 包含文件大小的头部
                self.sock.sendall(header)  # 发送文件头(文件大小)
                self.sock.sendall(data)  # 发送数据
                print("[INFO] 数据发送完成")
            except Exception as e:
                print(f"[ERROR] 发送数据失败: {e}")
        else:
            print("[ERROR] 连接不可用,无法发送数据")

    def send_image(self, image_path):
        """
        发送图片数据
        :param image_path: 图片的文件路径
        """
        try:
            with open(image_path, 'rb') as f:
                img_data = f.read()
                self.send_data(img_data)
            print(f"[INFO] 图片 {image_path} 发送完成")
        except Exception as e:
            print(f"[ERROR] 读取图片失败: {e}")

    def close(self):
        """安全关闭连接"""
        if self.sock:
            try:
                self.sock.close()
                print("[INFO] 连接已关闭")
            except Exception as e:
                print(f"[WARN] 关闭连接异常: {e}")
            finally:
                self.sock = None
                gc.collect()  # 强制释放底层socket内存
        else:
            print("[ERROR] 连接未建立,无法关闭")

# 调用示例
# net = WIFI()
# net.init("YJOT","yijia0516")
# # net = NetworkManager(wireless=False)
# # net.init()
# print(net.get_config())
# net.internet()
# print("IP:",net.ip)

if __name__ == "__main__":
    client = TcpClient(host="192.168.58.108", port=8080, keep_alive=True)
    client.init()

    # 发送图片数据
    # client.send_image("/data/Img/19700101_000834.jpg")

    # 发送字节数据
    client.send_data(b"Hello, server!")
    # 关闭连接
    # client.close()
    while True:
        data = client.recv()
        if data:
            print(f"[INFO] 收到服务器数据(type={type(data)}): {data}")
        time.sleep(1)

硬件板卡


庐山派

硬件板卡


庐山派

硬件板卡


庐山派

硬件板卡


庐山派

软件版本


CanMV v1.4-2-g873d625(based on Micropython e00a144) on 2025-09-12; k230_canmv_lckfb with K230

1 Answers

你好,请发一下复现代码。以及固件版本信息。

更新到问题里了

好的,我们看一下怎么回事。