测试发现所有和绝对时间相关的函数都是异常的 这该怎么解决 目前测试过的有
- mq_timedreceive
- mq_timedsend
- sem_timedwait
- pthread_cond_timedwait
- pselect
全都是异常的 要不然就立即返回 要不然就一直阻塞永不退出
测试发现所有和绝对时间相关的函数都是异常的 这该怎么解决 目前测试过的有
全都是异常的 要不然就立即返回 要不然就一直阻塞永不退出
你好,这个是由于k230的rtsmart版本比较古老,部分posix接口有问题,建议参考micropython的实现,建立一个device,通过ioctl在内核中完成这些。
贴一个可用的mq胶合层驱动在这里
mq.h
/**
* @file zj_mq.h
* @brief 内核态和用户态共享的消息队列定义
*/
/* 消息队列驱动头文件 */
#ifndef __ZJ_MQ_H__
#define __ZJ_MQ_H__
#include <rtthread.h>
/* 设备名称 */
#define ZJ_MQ_DEVICE_NAME "/dev/zj_mq"
/* IOCTL命令 */
#define ZJ_MQ_IOC_MAGIC 'M'
#define ZJ_MQ_IOC_CREATE _IOW(ZJ_MQ_IOC_MAGIC, 1, struct zj_mq_create_param) /* 创建消息队列 */
#define ZJ_MQ_IOC_DELETE _IOW(ZJ_MQ_IOC_MAGIC, 2, char[RT_NAME_MAX]) /* 删除消息队列 */
#define ZJ_MQ_IOC_SEND _IOW(ZJ_MQ_IOC_MAGIC, 3, struct zj_mq_transfer_param) /* 发送消息 */
#define ZJ_MQ_IOC_RECEIVE _IOWR(ZJ_MQ_IOC_MAGIC, 4, struct zj_mq_transfer_param) /* 接收消息 */
/* 创建消息队列参数 */
struct zj_mq_create_param {
char name[RT_NAME_MAX]; /* 消息队列名称 */
rt_size_t mq_msgsize; /* 消息大小 */
rt_size_t mq_maxmsg; /* 最大消息数量 */
rt_uint8_t mq_flags; /* 消息队列标志 */
};
/* 消息队列数据传输公共参数 */
struct zj_mq_transfer_param {
char name[RT_NAME_MAX]; /* 消息队列名称 */
void* data; /* 消息数据指针 */
rt_size_t size; /* 数据大小 */
rt_int32_t timeout; /* 超时时间(ms),0为非阻塞,负值为永久等待 */
};
#endif /* __ZJ_MQ_H__ */
mq.c
/**
* @file zj_mq.c
* @brief 消息队列驱动实现 - 面向用户态应用的RT-Thread消息队列胶合层
*
* 本驱动提供了用户态应用通过设备文件接口访问RT-Thread内核消息队列的能力。
* 用户态应用可以通过open、close、ioctl等标准文件操作使用该功能。
*
* 设备操作说明:
* 1. 打开设备: open("/dev/zj_mq", O_RDWR);
* 2. 关闭设备: close(fd);
* 3. 使用ioctl进行各种操作:
* - 创建队列: ioctl(fd, ZJ_MQ_IOC_CREATE, &create_param);
* - 删除队列: ioctl(fd, ZJ_MQ_IOC_DELETE, queue_name);
* - 发送消息: ioctl(fd, ZJ_MQ_IOC_SEND, &send_param);
* - 接收消息: ioctl(fd, ZJ_MQ_IOC_RECEIVE, &receive_param);
*/
#include <dfs_posix.h>
#include <fcntl.h>
#include <lwp.h>
#include <lwp_user_mm.h>
#include <rtdevice.h>
#include <rthw.h>
#include <rtthread.h>
#include <stdio.h>
#include <string.h>
#include <sys/ioctl.h>
#include "zj_mq.h"
#define DBG_TAG "zj_mq"
#ifdef RT_DEBUG
#define DBG_LVL DBG_LOG
#else
#define DBG_LVL DBG_WARNING
#endif
#define DBG_COLOR
#include <rtdbg.h>
#define ZJ_MQ_NAME "zj_mq"
#define ZJ_MQ_MAX_QUEUE_NUM 64 /* 最大支持同时打开的队列数 */
#define ZJ_MQ_MAX_PROCESSES 32 /* 最大支持同时打开的进程数 */
/* 消息队列结构 */
struct zj_mq {
char name[RT_NAME_MAX]; /* 消息队列名称 */
rt_bool_t used; /* 使用标志 */
int creator_pid; /* 创建者的进程ID */
rt_mq_t rt_mq; /* RT-Thread消息队列指针 */
};
/* 消息队列设备 */
struct zj_mq_device {
struct rt_device dev;
rt_mutex_t zj_mq_lock;
struct zj_mq zj_mq_pool[ZJ_MQ_MAX_QUEUE_NUM];
rt_uint32_t opened_count;
int opened_processes[ZJ_MQ_MAX_PROCESSES];
};
/* 消息队列控制块集合 */
static struct zj_mq_device zj_mq_dev;
/* 获取消息队列控制块 */
static struct zj_mq* zj_mq_find(const char* name)
{
struct zj_mq* mq = RT_NULL;
for (int i = 0; i < ZJ_MQ_MAX_QUEUE_NUM; i++) {
if (zj_mq_dev.zj_mq_pool[i].used && !rt_strncmp(zj_mq_dev.zj_mq_pool[i].name, name, RT_NAME_MAX)) {
mq = &zj_mq_dev.zj_mq_pool[i];
break;
}
}
return mq;
}
/* 分配一个消息队列控制块 */
static struct zj_mq* zj_mq_alloc(void)
{
struct zj_mq* mq = RT_NULL;
for (int i = 0; i < ZJ_MQ_MAX_QUEUE_NUM; i++) {
if (!zj_mq_dev.zj_mq_pool[i].used) {
mq = &zj_mq_dev.zj_mq_pool[i];
break;
}
}
return mq;
}
/* 创建消息队列 */
static rt_int32_t zj_mq_create(struct zj_mq_create_param* param, int creator_pid)
{
struct zj_mq* mq;
rt_int32_t ret = RT_EOK;
if (!param || !param->name[0] || param->mq_maxmsg <= 0 || param->mq_msgsize <= 0) {
return -RT_EINVAL;
}
rt_mutex_take(zj_mq_dev.zj_mq_lock, RT_WAITING_FOREVER);
/* 检查是否已存在同名队列 */
if (zj_mq_find(param->name) != RT_NULL) {
rt_mutex_release(zj_mq_dev.zj_mq_lock);
return -RT_EBUSY;
}
/* 获取一个未使用的消息队列控制块 */
mq = zj_mq_alloc();
if (mq == RT_NULL) {
rt_mutex_release(zj_mq_dev.zj_mq_lock);
return -RT_ENOMEM;
}
rt_memset(mq, 0, sizeof(struct zj_mq));
rt_strncpy(mq->name, param->name, RT_NAME_MAX);
mq->creator_pid = creator_pid;
mq->used = RT_TRUE;
/* 创建RT-Thread消息队列 */
mq->rt_mq = rt_mq_create(param->name, param->mq_msgsize, param->mq_maxmsg, RT_IPC_FLAG_FIFO);
if (mq->rt_mq == RT_NULL) {
mq->used = RT_FALSE;
rt_mutex_release(zj_mq_dev.zj_mq_lock);
return -RT_ENOMEM;
}
rt_mutex_release(zj_mq_dev.zj_mq_lock);
// LOG_I("进程 %d 创建消息队列 '%s', 最大消息数: %d, 消息大小: %d", creator_pid, param->name, param->mq_maxmsg,
// param->mq_msgsize);
return ret;
}
/* 删除消息队列 */
static rt_int32_t zj_mq_delete(const char* name, int pid)
{
struct zj_mq* mq;
rt_int32_t ret = RT_EOK;
if (!name || !name[0]) {
return -RT_EINVAL;
}
rt_mutex_take(zj_mq_dev.zj_mq_lock, RT_WAITING_FOREVER);
/* 查找队列 */
mq = zj_mq_find(name);
if (mq == RT_NULL) {
rt_mutex_release(zj_mq_dev.zj_mq_lock);
return -RT_EEMPTY;
}
/* 检查是否是创建者 */
if (mq->creator_pid != pid) {
LOG_W("进程 %d 尝试删除由进程 %d 创建的队列 '%s',拒绝操作", pid, mq->creator_pid, name);
rt_mutex_release(zj_mq_dev.zj_mq_lock);
return -RT_ERROR;
}
/* 销毁RT-Thread消息队列 */
if (mq->rt_mq) {
rt_mq_delete(mq->rt_mq);
mq->rt_mq = RT_NULL;
}
/* 标记为未使用 */
mq->used = RT_FALSE;
rt_mutex_release(zj_mq_dev.zj_mq_lock);
// LOG_I("进程 %d 删除消息队列 '%s'", pid, name);
return ret;
}
/* 发送消息 */
static rt_int32_t zj_mq_send(struct zj_mq_transfer_param* param, int pid)
{
struct zj_mq* mq;
rt_int32_t ret = RT_EOK;
if (!param || !param->name[0] || !param->data || param->size <= 0) {
return -RT_EINVAL;
}
rt_mutex_take(zj_mq_dev.zj_mq_lock, RT_WAITING_FOREVER);
/* 查找队列 */
mq = zj_mq_find(param->name);
if (mq == RT_NULL) {
rt_mutex_release(zj_mq_dev.zj_mq_lock);
return -RT_EEMPTY;
}
rt_mutex_release(zj_mq_dev.zj_mq_lock);
/* 发送消息到RT-Thread消息队列 */
rt_err_t rt_err;
if (param->timeout == 0) {
/* 非阻塞发送 */
rt_err = rt_mq_send(mq->rt_mq, param->data, param->size);
} else if (param->timeout == RT_WAITING_FOREVER) {
/* 永久阻塞发送 */
rt_err = rt_mq_send_wait(mq->rt_mq, param->data, param->size, RT_WAITING_FOREVER);
} else {
/* 超时阻塞发送 */
rt_err = rt_mq_send_wait(mq->rt_mq, param->data, param->size, param->timeout);
}
return rt_err;
}
/* 接收消息 */
static rt_int32_t zj_mq_receive(struct zj_mq_transfer_param* param, int pid)
{
struct zj_mq* mq;
rt_int32_t ret = RT_EOK;
if (!param || !param->name[0] || !param->data || param->size <= 0) {
return -RT_EINVAL;
}
rt_mutex_take(zj_mq_dev.zj_mq_lock, RT_WAITING_FOREVER);
/* 查找队列 */
mq = zj_mq_find(param->name);
if (mq == RT_NULL) {
rt_mutex_release(zj_mq_dev.zj_mq_lock);
return -RT_EEMPTY;
}
rt_mutex_release(zj_mq_dev.zj_mq_lock);
/* 从RT-Thread消息队列接收消息 */
rt_err_t rt_err;
if (param->timeout == 0) {
/* 非阻塞接收 */
rt_err = rt_mq_recv(mq->rt_mq, param->data, param->size, 0);
} else if (param->timeout == RT_WAITING_FOREVER) {
/* 永久阻塞接收 */
rt_err = rt_mq_recv(mq->rt_mq, param->data, param->size, RT_WAITING_FOREVER);
} else {
/* 超时阻塞接收 */
rt_err = rt_mq_recv(mq->rt_mq, param->data, param->size, param->timeout);
}
return rt_err;
}
/* 文件操作接口 */
static int zj_mq_fops_open(struct dfs_fd* fd)
{
int pid = lwp_getpid();
/* 检查该进程是否已经打开过一次设备 */
for (int i = 0; i < zj_mq_dev.opened_count; i++) {
if (zj_mq_dev.opened_processes[i] == pid) {
LOG_E("进程 %d 已经打开了zj_mq设备,同一个进程不允许重复打开", pid);
return -RT_EBUSY;
}
}
/* 添加到已打开进程列表 */
if (zj_mq_dev.opened_count < ZJ_MQ_MAX_PROCESSES) {
zj_mq_dev.opened_processes[zj_mq_dev.opened_count++] = pid;
// LOG_I("进程 %d 已加入打开设备列表,当前共有 %d 个进程正在使用zj_mq设备", pid, zj_mq_dev.opened_count);
} else {
LOG_E("已打开进程数超出限制,无法为进程 %d 打开设备", pid);
return -RT_EFULL;
}
/* 在文件描述符用户数据中存储进程ID */
fd->data = (void*)(long)pid;
return RT_EOK;
}
static int zj_mq_fops_close(struct dfs_fd* fd)
{
/* 从文件描述符用户数据中获取打开时存储的进程ID,避免应用异常退出时调用lwp_getpid()获取不到正确的pid */
int pid = (int)(long)fd->data;
if (pid <= 0) {
LOG_W("无效的进程ID: %d,可能进程已异常终止", pid);
pid = lwp_getpid(); /* 尝试获取当前进程ID作为后备 */
LOG_D("使用当前进程ID: %d 进行清理", pid);
}
/* 从已打开进程列表中移除 */
for (int i = 0; i < zj_mq_dev.opened_count; i++) {
if (zj_mq_dev.opened_processes[i] == pid) {
/* 将后面的元素前移 */
for (int j = i; j < zj_mq_dev.opened_count - 1; j++) {
zj_mq_dev.opened_processes[j] = zj_mq_dev.opened_processes[j + 1];
}
zj_mq_dev.opened_count--;
// LOG_I("进程 %d 已从打开设备列表移除,剩余 %d 个进程", pid, zj_mq_dev.opened_count);
break;
}
}
/* 使用超时机制获取互斥锁 */
if (rt_mutex_take(zj_mq_dev.zj_mq_lock, 1000) != RT_EOK) {
LOG_E("关闭设备时获取全局互斥锁超时,跳过资源清理");
return RT_EOK;
}
/* 检查是否有该进程创建的队列,进行清理 */
for (int i = 0; i < ZJ_MQ_MAX_QUEUE_NUM; i++) {
if (zj_mq_dev.zj_mq_pool[i].used) {
struct zj_mq* mq = &zj_mq_dev.zj_mq_pool[i];
/* 该队列由当前关闭的进程创建,需要释放 */
if (mq->creator_pid == pid) {
// LOG_I("清理进程 %d 创建的队列 '%s'", pid, mq->name);
/* 删除RT-Thread消息队列 */
if (mq->rt_mq) {
rt_mq_delete(mq->rt_mq);
mq->rt_mq = RT_NULL;
}
/* 标记为未使用 */
mq->used = RT_FALSE;
}
}
}
rt_mutex_release(zj_mq_dev.zj_mq_lock);
return RT_EOK;
}
static int zj_mq_fops_ioctl(struct dfs_fd* fd, int cmd, void* args)
{
rt_err_t result = RT_EOK;
int pid = (int)(long)fd->data; // 获取本次调用的调用者进程ID
switch (cmd) {
case ZJ_MQ_IOC_CREATE: {
struct zj_mq_create_param param;
rt_memset(¶m, 0, sizeof(param));
size_t copy_result = lwp_get_from_user(¶m, args, sizeof(param));
if (copy_result != sizeof(param)) {
LOG_E("从用户空间拷贝创建参数失败: 期望%zu字节,实际拷贝%zu字节", sizeof(param), copy_result);
result = -RT_ERROR;
break;
}
result = zj_mq_create(¶m, pid);
} break;
case ZJ_MQ_IOC_DELETE: {
char name[RT_NAME_MAX];
rt_memset(name, 0, RT_NAME_MAX);
size_t copy_result = lwp_get_from_user(name, args, RT_NAME_MAX);
if (copy_result == 0) {
LOG_E("从用户空间拷贝队列名称失败");
result = -RT_ERROR;
break;
}
result = zj_mq_delete(name, pid);
} break;
case ZJ_MQ_IOC_SEND: {
struct zj_mq_transfer_param param;
void* msg_buf = RT_NULL;
size_t copy_result = lwp_get_from_user(¶m, args, sizeof(param));
if (copy_result != sizeof(param)) {
LOG_E("从用户空间拷贝发送参数失败: 期望%zu字节,实际拷贝%zu字节", sizeof(param), copy_result);
result = -RT_ERROR;
break;
}
/* 参数检查 */
if (!param.name[0] || !param.data || param.size == 0) {
LOG_E("无效的发送参数: name=%s, msg=%p, len=%zu", param.name, param.data, param.size);
result = -RT_EINVAL;
break;
}
/* 分配内核缓冲区 */
msg_buf = rt_malloc(param.size);
if (msg_buf == RT_NULL) {
LOG_E("无法分配内核缓冲区: %zu 字节", param.size);
result = -RT_ENOMEM;
break;
}
/* 从用户空间复制消息数据 */
size_t data_copied = lwp_get_from_user(msg_buf, param.data, param.size);
if (data_copied != param.size) {
LOG_E("从用户空间拷贝消息数据失败: 期望%zu字节,实际拷贝%zu字节", param.size, data_copied);
result = -RT_ERROR;
rt_free(msg_buf);
break;
}
/* 暂存原始指针 */
void* orig_msg = param.data;
param.data = msg_buf;
/* 调用消息发送函数 */
result = zj_mq_send(¶m, pid);
/* 恢复原始指针并释放缓冲区 */
param.data = orig_msg;
rt_free(msg_buf);
} break;
case ZJ_MQ_IOC_RECEIVE: {
struct zj_mq_transfer_param param;
void* msg_buf = RT_NULL;
void* user_buf; // 保存用户空间的指针
rt_size_t recv_len = 0;
size_t copy_result = lwp_get_from_user(¶m, args, sizeof(param));
if (copy_result != sizeof(param)) {
LOG_E("从用户空间拷贝接收参数失败: 期望%zu字节,实际拷贝%zu字节", sizeof(param), copy_result);
result = -RT_ERROR;
break;
}
/* 参数检查 */
if (!param.name[0] || !param.data || param.size == 0) {
LOG_E("无效的接收参数: name=%s, msg=%p, len=%zu", param.name, param.data, param.size);
result = -RT_EINVAL;
break;
}
/* 保存用户空间缓冲区指针 */
user_buf = param.data;
/* 分配内核缓冲区 */
msg_buf = rt_malloc(param.size);
if (msg_buf == RT_NULL) {
LOG_E("无法分配内核缓冲区: %zu 字节", param.size);
result = -RT_ENOMEM;
break;
}
param.data = msg_buf;
/* 调用消息接收函数 */
result = zj_mq_receive(¶m, pid);
if (result == RT_EOK) {
/* 将接收到的消息数据复制到用户空间 */
size_t put_result = lwp_put_to_user(user_buf, msg_buf, param.size);
if (put_result != param.size) {
LOG_E("复制数据到用户空间失败: 期望%zu字节,实际拷贝%zu字节", param.size, put_result);
result = -RT_ERROR;
}
}
rt_free(msg_buf);
} break;
default:
LOG_E("未知命令: %d (0x%x)", cmd, cmd);
result = -RT_ENOSYS;
break;
}
return result;
}
/* 设备文件操作结构体 */
const static struct dfs_file_ops zj_mq_fops = {
zj_mq_fops_open,
zj_mq_fops_close,
zj_mq_fops_ioctl,
};
/* 初始化消息队列设备 */
static int zj_mq_device_init(void)
{
rt_err_t result = RT_EOK;
/* 初始化消息队列池 */
rt_memset(zj_mq_dev.zj_mq_pool, 0, sizeof(zj_mq_dev.zj_mq_pool));
/* 初始化已打开进程列表 */
rt_memset(zj_mq_dev.opened_processes, 0, sizeof(zj_mq_dev.opened_processes));
zj_mq_dev.opened_count = 0;
/* 创建互斥锁 */
zj_mq_dev.zj_mq_lock = rt_mutex_create("zj_mq_mutex", RT_IPC_FLAG_FIFO);
if (zj_mq_dev.zj_mq_lock == RT_NULL) {
LOG_E("Failed to create zj_mq mutex");
return -RT_ENOMEM;
}
result = rt_device_register(&zj_mq_dev.dev, ZJ_MQ_NAME, RT_DEVICE_FLAG_RDWR | RT_DEVICE_FLAG_STANDALONE);
if (result != RT_EOK) {
LOG_E("Failed to register zj_mq device: %d", result);
rt_mutex_delete(zj_mq_dev.zj_mq_lock);
zj_mq_dev.zj_mq_lock = RT_NULL;
return result;
}
/* 设置文件操作接口 */
zj_mq_dev.dev.fops = &zj_mq_fops;
LOG_I("ZJ Message Queue driver initialized");
return result;
}
INIT_DEVICE_EXPORT(zj_mq_device_init);
sem_timedwait和pthread_cond_timedwait应该是可以用得吧,我看了一下工具链,这2个函数有实现
实现有的 可惜行为并不正确 根据调用时传入的参数 具体的异常表现有2种
另外 最近一周多我根据你的建议尝试在内核中实现一个胶合层 想要通过iocrl来间接的调用内核中的rtrthread的队列 但又带来了一个新问题 内核中的 rt_mq_send_wait_interruptible 和 rt_mq_recv_interruptible进入阻塞状态后 无法正确的被ctrlc中断 是因为我写的问题么? 完整的代码在最后一楼里 已经困扰一周了
是指应用使用了rt_mq_send_wait_interruptible和rt_mq_recv_interruptible之后,无法被ctrl-c中断吗?