Buffer¶
Buffer¶
class Buffer : public muduo::copyable {
private:
std::vector<char> buffer_;
size_t readerIndex_;
size_t writerIndex_;
};
Buffer
有三个数据成员:buffer_
,readerIndex_
,writerIndex_
,内部示意图:
/// @code
/// +--------------------+------------------+------------------+
/// | dprependable bytes | readable bytes | writable bytes |
/// | | (CONTENT) | |
/// +--------------------+------------------+------------------+
/// | | | |
/// 0 <= readerIndex <= writerIndex <= size
/// @endcode
prependable bytes
:这是为了以预留出空间这样的极小的代价在buffer_头部插入数据相关信息。
+ readable bytes
: 可以读取字节数 → 用户从这里读取,socket缓冲区向这里写入
+ writable bytes
: 可以读写入字节数 → 用户向这里写入,向socket缓冲区发送
+ buffer_
字节序列buffer_
中存储的是网络字节序列,即大端字节序列。因此向buffer_
写入字节需要转为为网络字节序列,从这里读取数据需要转换为主机字节序列。
用户端¶
- 读取
读取的函数readIntxxx
:在内部调用了int64_t readInt64() { int64_t result = peekInt64(); retrieveInt64(); return result; }
peekInt64
和retrieveInt64
,前面的一个函数是操作buffer_
,后面一个函数是操作readerIndex
。网络自己序列的转换由peekInt64
完成:int64_t peekInt64() const { assert(readableBytes() >= sizeof(int64_t)); int64_t be64 = 0; ::memcpy(&be64, peek(), sizeof be64); // 从读取, return sockets::networkToHost64(be64); // 再转换字节序列 }
- 写入
写入使用的函数是appendxxx
:内部需要void appendInt64(int64_t x) { int64_t be64 = sockets::hostToNetwork64(x); // 先转换为网络字节序列 append(&be64, sizeof(int64_t)); // 再写入 }
append
函数,以及hasWritten
,前面用于写入数据到buffer_
中,后者用于操作writerIndex_
,因为是向buffer_
中增加内容,也因此会存在内存不足而需要扩容操作:void append(const char* /*restrict*/ data, size_t len) { // 要确保 ( size() - writerIndex ) > len ensureWritableBytes(len); // 将 [data, data_len) --> [writerIndex, writerIndex + len) std::copy(data, data+len, beginWrite()); // 操作 writerIndex_ hasWritten(len); }
socket 端¶
- 从 socket 中读取数据
使用的是Buffer::readFd(int fd, int* savedErrno)
。因为可能存在粘包问题,因此将所有的数据都读取到缓冲区中,并且自己设置一个二级缓冲区,尽可能一次性的将所有数据从socket
中读取出来。等完成,再写入buffer_
。ssize_t Buffer::readFd(int fd, int* savedErrno) { char extrabuf[65536]; // 64k 字节数 struct iovec vec[2]; const size_t writable = writableBytes(); // 第一级缓冲区是 buffer_ vec[0].iov_base = begin()+writerIndex_; vec[0].iov_len = writable; // 第二级缓冲区是 自定义缓冲区 vec[1].iov_base = extrabuf; vec[1].iov_len = sizeof extrabuf; // 使用几个缓冲区:muduo中使用的是2个。因为 writable <= 1024 const int iovcnt = (writable < sizeof extrabuf) ? 2 : 1; const ssize_t n = sockets::readv(fd, vec, iovcnt); // 读取完,下面需要把读取到的字节数写入到buffer_ if (n < 0) { *savedErrno = errno; } // 如果没用到第二个缓冲区 else if (implicit_cast<size_t>(n) <= writable) { writerIndex_ += n; } // 读取到的字节数超过 buffer_ 大小 // 就需要重新分配内存,扩容 else { writerIndex_ = buffer_.size(); append(extrabuf, n - writable); } return n; }
- 发送数到
socket
缓冲区,在TcpConnection
类中实现。
TcpConnection¶
class TcpConnection : noncopyable,
public std::enable_shared_from_this<TcpConnection> {
private:
Buffer inputBuffer_; // 用户从这里读取,socket向这里写
Buffer outputBuffer_; // 用户向这里写入,从这里向socket缓冲区里写
size_t highWaterMark_;
WriteCompleteCallback writeCompleteCallback_;
HighWaterMarkCallback highWaterMarkCallback_;
.
.
.
};
muduo
库为每个TcpConnection
各自分配了一个inputBuffer_
,outputBuffer_
,这些缓冲区每个TcpConnection
独有不共享,因此Buffer
不需要加锁设置为线程安全的。为了使得数据通信更加高效,设置了两个回调函数:
+ writeCompleteCallback_
:用于通知用户层已经将utputBuffer_
中的数据复制到socket
缓冲区中。+ 适合大流量数据发送,低流量不需要关注。 + 场景:不断地生成数据,然后发送
conn->send(...)
,如果对方接受不及时,收到滑动窗口的控制,内核缓冲区不足,这个时候就会将数据添加到引用层发送缓冲区outputBuffer_
,可能会使得outputBuffer_
不断胀大。
+ 解决:就是关注writeCompleteCallback_
,当write
操作outputBuffer_
中的所有数据都复制到socket
缓冲区中,通知用户可以发送数据,以调整发送频率。
-
highWaterMarkCallback_
:用于标记outputBuffer
字节数,当超过highWaterMark_
说明对端接受不及时,那么此时发送端调用highWaterMarkCallback_
来应对这种情况。 -
综上
上述两个函数的回调都可以视为两个方向的标志:writeCompleteCallback_
相当于是outputBuffer_
低水位标志highWaterMarkCallback_
相当于是outputBuffer_
高水位标志
send¶
void TcpConnection::send(Buffer* buf) {
if (state_ == kConnected) {
if (loop_->isInLoopThread()) {
sendInLoop(buf->peek(), buf->readableBytes());
buf->retrieveAll();
}
else {
void (TcpConnection::*fp)(const StringPiece& message) = &TcpConnection::sendInLoop;
loop_->runInLoop(std::bind(fp, this, buf->retrieveAllAsString()));
}
}
}
send
函数只能在TcpConnection
所属的EventLoop
中完成,send
函数将readableByte()
的字节全部通过socket
发送出去。
void TcpConnection::sendInLoop(const void* data, size_t len) {
loop_->assertInLoopThread();
...
if (!channel_->isWriting() && outputBuffer_.readableBytes() == 0) {
nwrote = sockets::write(channel_->fd(), data, len);
if (nwrote >= 0) {
remaining = len - nwrote;
if (remaining == 0 && writeCompleteCallback_) {
loop_->queueInLoop(std::bind(writeCompleteCallback_, shared_from_this()));
}
}
}
}
...
sendInLoop
中发送数据的核心部分如上图所示,当发送数据时候outputBuffer_
中的字节数为0,
1. 先通过sockets::write
将全部数据data
尽可能多的复制到socket
缓冲区中
2. 如果数据全部复制到socket
缓冲区中并且关注了复制完成回调函数:remaining == 0 && writeCompleteCallback_
,就执行该回调函数通知用户继续发送数据。
3. 如果没有将全部数据复制到socket
缓冲区中,那么remaining > 0
,此时就需要将剩余数据复制到outputBuffer_
中。
if (!faultError && remaining > 0) {
size_t oldLen = outputBuffer_.readableBytes();
if (oldLen + remaining >= highWaterMark_ &&
oldLen < highWaterMark_ &&
highWaterMarkCallback_)
{
loop_->queueInLoop(std::bind(highWaterMarkCallback_,
shared_from_this(),
oldLen + remaining));
}
outputBuffer_.append(static_cast<const char*>(data)+nwrote, remaining);
if (!channel_->isWriting()) {
channel_->enableWriting();
}
}
outputBuffer_
加入这份剩余的内容后超过了高水位标志,那么就调用highWaterMarkCallback_
。
5. 完成后,再关注POLLOUT
事件,等到下次可写。
6. 当有POLLOUT
事件发生时,就执行之前注册的回调函数handleWrite()
,将outputBuffer_
中的数据复制到socket
缓冲区中:
void TcpConnection::handleWrite() {
loop_->assertInLoopThread();
if (channel_->isWriting()) {
ssize_t n = sockets::write(channel_->fd(),
outputBuffer_.peek(),
outputBuffer_.readableBytes());
if (n > 0) {
outputBuffer_.retrieve(n);
if (outputBuffer_.readableBytes() == 0) {
channel_->disableWriting();
if (writeCompleteCallback_) {
loop_->queueInLoop(std::bind(writeCompleteCallback_, shared_from_this()));
}
if (state_ == kDisconnecting) { shutdownInLoop(); }
}
}
}
// 其余省略
}
n>0
,那么就需要对outputBuffer
的readerIndex_
进行相应的操作。
+ 如果数据全部读取完毕,那么readable()==0
。此时需要取消关注POLLOUT
事件,以防止busy loop
现象:因为此时缓冲区为空,又是EPOLL_LT
触发模式,那么会一直触发POLLOUT
,直到写满缓冲区,即busy loop
。具体原因可以参考I/O复用。
+ 如果已经在吃之前按已经调用shutdown
使得状态为kDisconnecting
,那么就直接调用shutdownInLoop()
关闭连接。
read¶
调用的是handleRead()
,如inputBuffer_.readFd(...)
所示,将数据接受到inputBuffer_
中:
void TcpConnection::handleRead(Timestamp receiveTime) {
loop_->assertInLoopThread();
ssize_t n = inputBuffer_.readFd(channel_->fd(), &savedErrno);
if (n > 0) { messageCallback_(...); }
else if (n == 0) { /** 关闭操作*/ }
else { /** 错误处理*/ }
}
shutdown¶
当应用程序需向关闭连接时,但是有可能正在处于发送数据的过程中,outputBuffer_
中还有数据,不能直接调用shutdown
,而是应该等数据都发送完毕,即outputBuffer_
都清空了再关闭连接。
void TcpConnection::shutdown() {
// FIXME: use compare and swap
if (state_ == kConnected) {
setState(kDisconnecting);
loop_->runInLoop(std::bind(&TcpConnection::shutdownInLoop, this));
}
}
shutdown()
只是将连接状态设置为kDisconnecting
,并没有关闭,真正的关闭需要等到不再关注POLLOUT
事件:
void TcpConnection::shutdownInLoop() {
loop_->assertInLoopThread();
if (!channel_->isWriting()) {
// we are not writing
socket_->shutdownWrite();
}
}
shutdownWrite()
即当channel_
不再关注POLLOUT
事件了,那么就可以安全地关闭了。