Post

workflow的网络io

目前看下来感觉workflow跟别的网络库在io层面有个很重要的不同点

大家虽然都是使用epoll,也是one loop per thread的模型,但在queue的时候是有区别的

比如外部一个线程往一个epoll wait的线程里做add相关操作

这个时候有一个kick的机制,libevent或者别的都是类似的,要么是考虑跨平台的pipe fd,要么是event fd,都存在一个打断

但是workflow的设计跟这些都不太一样

比如看一个任务需要被add的时候,这段代码会通过mpoller直接走到poller_add里,在mpoll里默认会选一个poller,通过fd取mod的方式选

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
int poller_add(const struct poller_data *data, int timeout, poller_t *poller)
{
	struct __poller_node *node;

	node = __poller_new_node(data, timeout, poller);
	if (!node)
		return -1;

	pthread_mutex_lock(&poller->mutex);
	if (!poller->nodes[data->fd])
	{
		if (__poller_add_fd(data->fd, node->event, node, poller) >= 0)
		{
			if (timeout >= 0)
				__poller_insert_node(node, poller);
			else
				list_add_tail(&node->list, &poller->no_timeo_list);

			poller->nodes[data->fd] = node;
			node = NULL;
		}
	}
	else
		errno = EEXIST;

	pthread_mutex_unlock(&poller->mutex);
	if (node == NULL)
		return 0;

	free(node->res);
	free(node);
	return -1;
}

随后直接把这个fd弄到poller的epoll fd里去

1
2
3
4
5
6
7
8
9
10
11
static inline int __poller_add_fd(int fd, int event, void *data,
								  poller_t *poller)
{
	struct epoll_event ev = {
		.events		=	event,
		.data		=	{
			.ptr	=	data
		}
	};
	return epoll_ctl(poller->pfd, EPOLL_CTL_ADD, fd, &ev);
}

通过poller内的mutex保证在poller内的node操作是线程安全的

另外往一个poller上做fd的修改的机制还是类似的

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
int poller_mod(const struct poller_data *data, int timeout, poller_t *poller)
{
	struct __poller_node *node;
	struct __poller_node *orig;
	int stopped = 0;

	node = __poller_new_node(data, timeout, poller);
	if (!node)
		return -1;

	pthread_mutex_lock(&poller->mutex);
	orig = poller->nodes[data->fd];
	if (orig)
	{
		if (__poller_mod_fd(data->fd, orig->event, node->event, node, poller) >= 0)
		{
			if (orig->in_rbtree)
				__poller_tree_erase(orig, poller);
			else
				list_del(&orig->list);

			orig->error = 0;
			orig->state = PR_ST_MODIFIED;
			stopped = poller->stopped;
			if (!stopped)
			{
				orig->removed = 1;
				write(poller->pipe_wr, &orig, sizeof (void *));
			}

			if (timeout >= 0)
				__poller_insert_node(node, poller);
			else
				list_add_tail(&node->list, &poller->no_timeo_list);

			poller->nodes[data->fd] = node;
			node = NULL;
		}
	}
	else
		errno = ENOENT;

	pthread_mutex_unlock(&poller->mutex);
	if (stopped)
	{
		free(orig->res);
		poller->callback((struct poller_result *)orig, poller->context);
	}

	if (node == NULL)
		return 0;

	free(node->res);
	free(node);
	return -1;
}

这其实就等于告诉epoll,你有时间执行了再去处理这段。

另外这个poller node里针对读写的设计也非常的骚。。。

  1. 看起来这个fd长期保持EPOLLIN
  2. 有写任务的时候先同步写
  3. 数据没法写完,epollctl改EPOLLOUT,开始异步发送
  4. 异步发送完了再把fd设置位回EPOLLIN

每个node一次只能关注一件事,这里能想到的点就是他可能为了写时候不要异步,以为操作系统可能为了写去调整send buffer的大小

简单的说,看了io这部分还是惊了一下吧,但暂时没找到相关的bench来表明这个是真的快在这里的

This post is licensed under CC BY 4.0 by the author.

Trending Tags