在 RT-Thread 中进行一对多通信

  1. 1. Getting Start
    1. 1.1. 声明话题
    2. 1.2. 发布话题
    3. 1.3. 订阅话题
  2. 2. 完整代码

线程间通信(IPC)是 RTOS 中相当重要的一部分,在 RT-Thread 中常用的 IPC 方法有 邮箱消息队列,但这两个方法都有局限:只能适用于 一对一多对一 的情况,没法很好地处理 一对多多对多 的情况,虽然我们可以使用 事件集 实现一对多的同步,但该方法未免有点麻烦。这里就要介绍一下 RT-Thread 上的一个软件包:micro Multi-Communication Node (uMCN),该软件包在 RT-Thread 中引入了在 ROS 中十分常用的通信机制:发布/订阅机制

发布/订阅模式:流处理架构中的瑞士军刀_AlbenXie的博客-CSDN博客_发布订阅架构模式

Getting Start

1
2
3
RT-Thread online packages  --->
tools packages --->
[*] uMCN is a light-weight and powerful IPC library based on the publisher/subscriber model. --->

声明话题

uMCN 可以发送任意类型的消息,仅需使用 MCN_DEFINE 声明 topic 即可。这里我声明了一个名为 my_topic 的话题。

1
2
3
4
typedef struct my_data {
uint32_t num;
} my_data_t;
MCN_DEFINE(my_topic, sizeof(my_data_t));

发布话题

使用 mcn_advertise 来注册新话题。

1
mcn_advertise(MCN_HUB(my_topic), NULL);

注册话题之后我们就可以在话题上发布消息了:

1
2
3
my_data_t data;

mcn_publish(MCN_HUB(my_topic), &data);

订阅话题

订阅话题有两种方式:异步和同步,这里我采用的是同步方式:

1
2
3
4
5
6
7
8
9
10
11
rt_sem_t event = rt_sem_create("thd1_evt", 0, RT_IPC_FLAG_PRIO);
McnNode_t node = mcn_subscribe(MCN_HUB(my_topic), event, NULL);
my_data_t data;

while (1) {
if (mcn_poll_sync(node, RT_WAITING_FOREVER)) {
mcn_copy(MCN_HUB(my_topic), node, &data);

rt_kprintf("message received: %d\n", data.num);
}
}

完整代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
#include <rtthread.h>
#include <uMCN.h>

#include <stdint.h>
#include <stddef.h>

typedef struct my_data {
uint32_t num;
} my_data_t;
MCN_DEFINE(my_topic, sizeof(my_data_t));

static rt_thread_t thread_1;
static rt_thread_t thread_2;

static void thread1_entry(void *param);
static void thread2_entry(void *param);

static int my_topic_init() {
mcn_advertise(MCN_HUB(my_topic), NULL);

thread_1 = rt_thread_create("thread1", thread1_entry, RT_NULL, 512, 10, 10);
thread_2 = rt_thread_create("thread1", thread2_entry, RT_NULL, 512, 10, 10);

rt_thread_startup(thread_1);
rt_thread_startup(thread_2);

return 0;
}
INIT_APP_EXPORT(my_topic_init);

static void my_topic_publish() {
static my_data_t data;

mcn_publish(MCN_HUB(my_topic), &data);
data.num++;
}
MSH_CMD_EXPORT(my_topic_publish, publish one data to topic)

void thread1_entry(void *param) {
(void) param;

rt_sem_t event = rt_sem_create("thd1_evt", 0, RT_IPC_FLAG_PRIO);
McnNode_t node = mcn_subscribe(MCN_HUB(my_topic), event, NULL);
my_data_t data;

while (1) {
if (mcn_poll_sync(node, RT_WAITING_FOREVER)) {
mcn_copy(MCN_HUB(my_topic), node, &data);

rt_kprintf("thread1 received: %d\n", data.num);
}
}
}

void thread2_entry(void *param) {
(void) param;

rt_sem_t event = rt_sem_create("thd2_evt", 0, RT_IPC_FLAG_PRIO);
McnNode_t node = mcn_subscribe(MCN_HUB(my_topic), event, NULL);
my_data_t data;

while (1) {
if (mcn_poll_sync(node, RT_WAITING_FOREVER)) {
mcn_copy(MCN_HUB(my_topic), node, &data);

rt_kprintf("thread2 received: %d\n", data.num);
}
}
}
本网站所有文章除特别声明外,均采用 知识共享署名-非商业性使用-相同方式共享 4.0 国际许可协议 进行许可。