tcp client design
muduo的tcpclient属于tcp connection + connector的组合使用封装
首先看到构造tcpclient,然后connection,有connected回调之后就可以关注业务逻辑了、
构造
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
TcpClient::TcpClient(EventLoop* loop,
const InetAddress& serverAddr,
const string& nameArg)
: loop_(CHECK_NOTNULL(loop)),
connector_(new Connector(loop, serverAddr)),
name_(nameArg),
connectionCallback_(defaultConnectionCallback),
messageCallback_(defaultMessageCallback),
retry_(false),
connect_(true),
nextConnId_(1)
{
connector_->setNewConnectionCallback(
std::bind(&TcpClient::newConnection, this, _1));
// FIXME setConnectFailedCallback
LOG_INFO << "TcpClient::TcpClient[" << name_
<< "] - connector " << get_pointer(connector_);
}
这里有connection id,意味着client可以多次connect不同的地址,底下的connection应该是唯一的。随后看到这里有一个connector
1
2
3
4
5
6
7
8
9
Connector::Connector(EventLoop* loop, const InetAddress& serverAddr)
: loop_(loop),
serverAddr_(serverAddr),
connect_(false),
state_(kDisconnected),
retryDelayMs_(kInitRetryDelayMs)
{
LOG_DEBUG << "ctor[" << this << "]";
}
这个应该就是单纯辅助tcp建联过程的组件
connect
随后,tcp调用connect
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
void TcpClient::connect()
{
// FIXME: check state
LOG_INFO << "TcpClient::connect[" << name_ << "] - connecting to "
<< connector_->serverAddress().toIpPort();
connect_ = true;
connector_->start();
}
void Connector::start()
{
connect_ = true;
loop_->runInLoop(std::bind(&Connector::startInLoop, this)); // FIXME: unsafe
}
void Connector::startInLoop()
{
loop_->assertInLoopThread();
assert(state_ == kDisconnected);
if (connect_)
{
connect();
}
else
{
LOG_DEBUG << "do not connect";
}
}
void Connector::connect()
{
int sockfd = sockets::createNonblockingOrDie(serverAddr_.family());
int ret = sockets::connect(sockfd, serverAddr_.getSockAddr());
int savedErrno = (ret == 0) ? 0 : errno;
switch (savedErrno)
{
case 0:
case EINPROGRESS:
case EINTR:
case EISCONN:
connecting(sockfd);
break;
case EAGAIN:
case EADDRINUSE:
case EADDRNOTAVAIL:
case ECONNREFUSED:
case ENETUNREACH:
retry(sockfd);
break;
case EACCES:
case EPERM:
case EAFNOSUPPORT:
case EALREADY:
case EBADF:
case EFAULT:
case ENOTSOCK:
LOG_SYSERR << "connect error in Connector::startInLoop " << savedErrno;
sockets::close(sockfd);
break;
default:
LOG_SYSERR << "Unexpected error in Connector::startInLoop " << savedErrno;
sockets::close(sockfd);
// connectErrorCallback_();
break;
}
}
connect的流程稍微多点,需要处理socket error的报错,一次看下这些报错的原因
在 C 语言中,case
语句通常用于 switch
语句中,以处理不同的错误代码。在你提供的代码片段中,这些错误代码是与网络编程相关的,通常是在使用套接字(socket)进行连接时可能遇到的错误。以下是你列出的错误代码的解释:
1. EINPROGRESS
- 含义: 表示连接正在进行中。
- 解释: 当非阻塞套接字调用
connect()
时,如果连接尚未完成,系统会返回此错误。程序应继续检查连接状态。
2. EINTR
- 含义: 系统调用被信号中断。
- 解释: 这是一个常见的错误,表示在执行系统调用时被信号打断。通常需要重新尝试该操作。
3. EISCONN
- 含义: 套接字已经连接。
- 解释: 表示尝试连接的套接字已经处于连接状态。这通常是因为套接字之前已经成功连接。
4. EAGAIN
- 含义: 资源暂时不可用。
- 解释: 在非阻塞模式下,操作无法立即完成,通常意味着需要稍后重试。
5. EADDRINUSE
- 含义: 地址已在使用中。
- 解释: 表示所请求的地址(IP 地址和端口)已被另一个套接字使用,无法绑定或连接。
6. EADDRNOTAVAIL
- 含义: 地址不可用。
- 解释: 表示所请求的地址在本地不可用,通常是因为网络配置问题。
7. ECONNREFUSED
- 含义: 连接被拒绝。
- 解释: 表示目标主机的套接字没有在指定的端口上监听,或者目标主机拒绝了连接。
8. ENETUNREACH
- 含义: 网络不可达。
- 解释: 表示目标网络不可达,可能是由于网络配置问题或路由错误。
9. EACCES
- 含义: 权限被拒绝。
- 解释: 表示没有足够的权限来执行请求的操作,通常是因为文件或资源的访问权限设置。
10. EPERM
- 含义: 操作不允许。
- 解释: 表示操作被系统拒绝,通常是因为权限不足。
11. EAFNOSUPPORT
- 含义: 地址族不支持。
- 解释: 表示所请求的地址族(如 IPv4 或 IPv6)不被支持。
12. EALREADY
- 含义: 连接请求已经在进行中。
- 解释: 表示套接字的连接请求已经被发起,不能再次发起连接。
13. EBADF
- 含义: 无效的文件描述符。
- 解释: 表示传递给系统调用的文件描述符无效,通常是因为套接字未正确创建或已关闭。
14. EFAULT
- 含义: 错误的地址。
- 解释: 表示传递给系统调用的指针指向无效的内存地址。
15. ENOTSOCK
- 含义: 不是一个套接字。
- 解释: 表示操作的文件描述符不是一个有效的套接字。
具体的链接和回调处理
首先看正常状况下的connect
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
void Connector::connecting(int sockfd)
{
setState(kConnecting);
assert(!channel_);
channel_.reset(new Channel(loop_, sockfd));
channel_->setWriteCallback(
std::bind(&Connector::handleWrite, this)); // FIXME: unsafe
channel_->setErrorCallback(
std::bind(&Connector::handleError, this)); // FIXME: unsafe
// channel_->tie(shared_from_this()); is not working,
// as channel_ is not managed by shared_ptr
channel_->enableWriting();
}
void Connector::handleWrite()
{
LOG_TRACE << "Connector::handleWrite " << state_;
if (state_ == kConnecting)
{
int sockfd = removeAndResetChannel();
int err = sockets::getSocketError(sockfd);
if (err)
{
LOG_WARN << "Connector::handleWrite - SO_ERROR = "
<< err << " " << strerror_tl(err);
retry(sockfd);
}
else if (sockets::isSelfConnect(sockfd))
{
LOG_WARN << "Connector::handleWrite - Self connect";
retry(sockfd);
}
else
{
setState(kConnected);
if (connect_)
{
newConnectionCallback_(sockfd);
}
else
{
sockets::close(sockfd);
}
}
}
else
{
// what happened?
assert(state_ == kDisconnected);
}
}
int Connector::removeAndResetChannel()
{
channel_->disableAll();
channel_->remove();
int sockfd = channel_->fd();
// Can't reset channel_ here, because we are inside Channel::handleEvent
loop_->queueInLoop(std::bind(&Connector::resetChannel, this)); // FIXME: unsafe
return sockfd;
}
这里创建socket关联channel,然后在handle readevent的时候放弃这个fd,把这个connector的state设置成connected,然后抛newconnection callback。
这里不能立刻reset channel的原因就是这个callback是从channel的handleEventWithGuard
里抛出来的,而且没用shared ptr之类的做延迟释放的gurad,且在这个callback之后还调用的channel的成员变量,因此这里其实不能立刻释放,不然就会崩
tcp client在回调中新建连接
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
void TcpClient::newConnection(int sockfd)
{
loop_->assertInLoopThread();
InetAddress peerAddr(sockets::getPeerAddr(sockfd));
char buf[32];
snprintf(buf, sizeof buf, ":%s#%d", peerAddr.toIpPort().c_str(), nextConnId_);
++nextConnId_;
string connName = name_ + buf;
InetAddress localAddr(sockets::getLocalAddr(sockfd));
// FIXME poll with zero timeout to double confirm the new connection
// FIXME use make_shared if necessary
TcpConnectionPtr conn(new TcpConnection(loop_,
connName,
sockfd,
localAddr,
peerAddr));
conn->setConnectionCallback(connectionCallback_);
conn->setMessageCallback(messageCallback_);
conn->setWriteCompleteCallback(writeCompleteCallback_);
conn->setCloseCallback(
std::bind(&TcpClient::removeConnection, this, _1)); // FIXME: unsafe
{
MutexLockGuard lock(mutex_);
connection_ = conn;
}
conn->connectEstablished();
}
随后就是connection相关的代码了。
connector的rery处理
就是存在一个2倍数的退避,最大delay是30s
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
void Connector::retry(int sockfd)
{
sockets::close(sockfd);
setState(kDisconnected);
if (connect_)
{
LOG_INFO << "Connector::retry - Retry connecting to " << serverAddr_.toIpPort()
<< " in " << retryDelayMs_ << " milliseconds. ";
loop_->runAfter(retryDelayMs_/1000.0,
std::bind(&Connector::startInLoop, shared_from_this()));
retryDelayMs_ = std::min(retryDelayMs_ * 2, kMaxRetryDelayMs);
}
else
{
LOG_DEBUG << "do not connect";
}
}
tcp client能不能多次connect
再回去看下能不能多次connect不同的地址
1
2
3
4
5
6
7
8
9
10
11
12
13
void Connector::startInLoop()
{
loop_->assertInLoopThread();
assert(state_ == kDisconnected);
if (connect_)
{
connect();
}
else
{
LOG_DEBUG << "do not connect";
}
}
这里看到只要是disconnected的状态下,其实是可以多次connect,然后不断的创建新的connection的