workflow的网络io
目前看下来感觉workflow跟别的网络库在io层面有个很重要的不同点
大家虽然都是使用epoll,也是one loop per thread的模型,但在queue的时候是有区别的
比如外部一个线程往一个epoll wait的线程里做add相关操作
这个时候有一个kick的机制,libevent或者别的都是类似的,要么是考虑跨平台的pipe fd,要么是event fd,都存在一个打断
但是workflow的设计跟这些都不太一样
比如看一个任务需要被add的时候,这段代码会通过mpoller直接走到poller_add里,在mpoll里默认会选一个poller,通过fd取mod的方式选
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
int poller_add(const struct poller_data *data, int timeout, poller_t *poller)
{
struct __poller_node *node;
node = __poller_new_node(data, timeout, poller);
if (!node)
return -1;
pthread_mutex_lock(&poller->mutex);
if (!poller->nodes[data->fd])
{
if (__poller_add_fd(data->fd, node->event, node, poller) >= 0)
{
if (timeout >= 0)
__poller_insert_node(node, poller);
else
list_add_tail(&node->list, &poller->no_timeo_list);
poller->nodes[data->fd] = node;
node = NULL;
}
}
else
errno = EEXIST;
pthread_mutex_unlock(&poller->mutex);
if (node == NULL)
return 0;
free(node->res);
free(node);
return -1;
}
随后直接把这个fd弄到poller的epoll fd里去
1
2
3
4
5
6
7
8
9
10
11
static inline int __poller_add_fd(int fd, int event, void *data,
poller_t *poller)
{
struct epoll_event ev = {
.events = event,
.data = {
.ptr = data
}
};
return epoll_ctl(poller->pfd, EPOLL_CTL_ADD, fd, &ev);
}
通过poller内的mutex保证在poller内的node操作是线程安全的
另外往一个poller上做fd的修改的机制还是类似的
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
int poller_mod(const struct poller_data *data, int timeout, poller_t *poller)
{
struct __poller_node *node;
struct __poller_node *orig;
int stopped = 0;
node = __poller_new_node(data, timeout, poller);
if (!node)
return -1;
pthread_mutex_lock(&poller->mutex);
orig = poller->nodes[data->fd];
if (orig)
{
if (__poller_mod_fd(data->fd, orig->event, node->event, node, poller) >= 0)
{
if (orig->in_rbtree)
__poller_tree_erase(orig, poller);
else
list_del(&orig->list);
orig->error = 0;
orig->state = PR_ST_MODIFIED;
stopped = poller->stopped;
if (!stopped)
{
orig->removed = 1;
write(poller->pipe_wr, &orig, sizeof (void *));
}
if (timeout >= 0)
__poller_insert_node(node, poller);
else
list_add_tail(&node->list, &poller->no_timeo_list);
poller->nodes[data->fd] = node;
node = NULL;
}
}
else
errno = ENOENT;
pthread_mutex_unlock(&poller->mutex);
if (stopped)
{
free(orig->res);
poller->callback((struct poller_result *)orig, poller->context);
}
if (node == NULL)
return 0;
free(node->res);
free(node);
return -1;
}
这其实就等于告诉epoll,你有时间执行了再去处理这段。
另外这个poller node里针对读写的设计也非常的骚。。。
- 看起来这个fd长期保持EPOLLIN
- 有写任务的时候先同步写
- 数据没法写完,epollctl改EPOLLOUT,开始异步发送
- 异步发送完了再把fd设置位回EPOLLIN
每个node一次只能关注一件事,这里能想到的点就是他可能为了写时候不要异步,以为操作系统可能为了写去调整send buffer的大小
简单的说,看了io这部分还是惊了一下吧,但暂时没找到相关的bench来表明这个是真的快在这里的
This post is licensed under CC BY 4.0 by the author.