Buffer¶
Buffer¶
class Buffer : public muduo::copyable {
private:
std::vector<char> buffer_;
size_t readerIndex_;
size_t writerIndex_;
};
Buffer 有三个数据成员:buffer_,readerIndex_,writerIndex_,内部示意图:
/// @code
/// +--------------------+------------------+------------------+
/// | dprependable bytes | readable bytes | writable bytes |
/// | | (CONTENT) | |
/// +--------------------+------------------+------------------+
/// | | | |
/// 0 <= readerIndex <= writerIndex <= size
/// @endcode
prependable bytes:这是为了以预留出空间这样的极小的代价在buffer_头部插入数据相关信息。
+ readable bytes: 可以读取字节数 → 用户从这里读取,socket缓冲区向这里写入
+ writable bytes: 可以读写入字节数 → 用户向这里写入,向socket缓冲区发送
+ buffer_字节序列buffer_中存储的是网络字节序列,即大端字节序列。因此向buffer_写入字节需要转为为网络字节序列,从这里读取数据需要转换为主机字节序列。
用户端¶
- 读取
读取的函数readIntxxx:在内部调用了int64_t readInt64() { int64_t result = peekInt64(); retrieveInt64(); return result; }peekInt64和retrieveInt64,前面的一个函数是操作buffer_,后面一个函数是操作readerIndex。网络自己序列的转换由peekInt64完成:int64_t peekInt64() const { assert(readableBytes() >= sizeof(int64_t)); int64_t be64 = 0; ::memcpy(&be64, peek(), sizeof be64); // 从读取, return sockets::networkToHost64(be64); // 再转换字节序列 } - 写入
写入使用的函数是appendxxx:内部需要void appendInt64(int64_t x) { int64_t be64 = sockets::hostToNetwork64(x); // 先转换为网络字节序列 append(&be64, sizeof(int64_t)); // 再写入 }append函数,以及hasWritten,前面用于写入数据到buffer_中,后者用于操作writerIndex_,因为是向buffer_中增加内容,也因此会存在内存不足而需要扩容操作:void append(const char* /*restrict*/ data, size_t len) { // 要确保 ( size() - writerIndex ) > len ensureWritableBytes(len); // 将 [data, data_len) --> [writerIndex, writerIndex + len) std::copy(data, data+len, beginWrite()); // 操作 writerIndex_ hasWritten(len); }
socket 端¶
- 从 socket 中读取数据
使用的是Buffer::readFd(int fd, int* savedErrno)。因为可能存在粘包问题,因此将所有的数据都读取到缓冲区中,并且自己设置一个二级缓冲区,尽可能一次性的将所有数据从socket中读取出来。等完成,再写入buffer_。ssize_t Buffer::readFd(int fd, int* savedErrno) { char extrabuf[65536]; // 64k 字节数 struct iovec vec[2]; const size_t writable = writableBytes(); // 第一级缓冲区是 buffer_ vec[0].iov_base = begin()+writerIndex_; vec[0].iov_len = writable; // 第二级缓冲区是 自定义缓冲区 vec[1].iov_base = extrabuf; vec[1].iov_len = sizeof extrabuf; // 使用几个缓冲区:muduo中使用的是2个。因为 writable <= 1024 const int iovcnt = (writable < sizeof extrabuf) ? 2 : 1; const ssize_t n = sockets::readv(fd, vec, iovcnt); // 读取完,下面需要把读取到的字节数写入到buffer_ if (n < 0) { *savedErrno = errno; } // 如果没用到第二个缓冲区 else if (implicit_cast<size_t>(n) <= writable) { writerIndex_ += n; } // 读取到的字节数超过 buffer_ 大小 // 就需要重新分配内存,扩容 else { writerIndex_ = buffer_.size(); append(extrabuf, n - writable); } return n; } - 发送数到
socket缓冲区,在TcpConnection类中实现。
TcpConnection¶
class TcpConnection : noncopyable,
public std::enable_shared_from_this<TcpConnection> {
private:
Buffer inputBuffer_; // 用户从这里读取,socket向这里写
Buffer outputBuffer_; // 用户向这里写入,从这里向socket缓冲区里写
size_t highWaterMark_;
WriteCompleteCallback writeCompleteCallback_;
HighWaterMarkCallback highWaterMarkCallback_;
.
.
.
};
muduo库为每个TcpConnection各自分配了一个inputBuffer_,outputBuffer_,这些缓冲区每个TcpConnection独有不共享,因此Buffer不需要加锁设置为线程安全的。为了使得数据通信更加高效,设置了两个回调函数:
+ writeCompleteCallback_:用于通知用户层已经将utputBuffer_中的数据复制到socket缓冲区中。+ 适合大流量数据发送,低流量不需要关注。 + 场景:不断地生成数据,然后发送
conn->send(...),如果对方接受不及时,收到滑动窗口的控制,内核缓冲区不足,这个时候就会将数据添加到引用层发送缓冲区outputBuffer_,可能会使得outputBuffer_不断胀大。
+ 解决:就是关注writeCompleteCallback_,当write操作outputBuffer_中的所有数据都复制到socket缓冲区中,通知用户可以发送数据,以调整发送频率。
-
highWaterMarkCallback_:用于标记outputBuffer字节数,当超过highWaterMark_说明对端接受不及时,那么此时发送端调用highWaterMarkCallback_来应对这种情况。 -
综上
上述两个函数的回调都可以视为两个方向的标志:writeCompleteCallback_相当于是outputBuffer_低水位标志highWaterMarkCallback_相当于是outputBuffer_高水位标志
send¶
void TcpConnection::send(Buffer* buf) {
if (state_ == kConnected) {
if (loop_->isInLoopThread()) {
sendInLoop(buf->peek(), buf->readableBytes());
buf->retrieveAll();
}
else {
void (TcpConnection::*fp)(const StringPiece& message) = &TcpConnection::sendInLoop;
loop_->runInLoop(std::bind(fp, this, buf->retrieveAllAsString()));
}
}
}
send函数只能在TcpConnection所属的EventLoop中完成,send函数将readableByte()的字节全部通过socket发送出去。
void TcpConnection::sendInLoop(const void* data, size_t len) {
loop_->assertInLoopThread();
...
if (!channel_->isWriting() && outputBuffer_.readableBytes() == 0) {
nwrote = sockets::write(channel_->fd(), data, len);
if (nwrote >= 0) {
remaining = len - nwrote;
if (remaining == 0 && writeCompleteCallback_) {
loop_->queueInLoop(std::bind(writeCompleteCallback_, shared_from_this()));
}
}
}
}
...
sendInLoop中发送数据的核心部分如上图所示,当发送数据时候outputBuffer_中的字节数为0,
1. 先通过sockets::write将全部数据data尽可能多的复制到socket缓冲区中
2. 如果数据全部复制到socket缓冲区中并且关注了复制完成回调函数:remaining == 0 && writeCompleteCallback_,就执行该回调函数通知用户继续发送数据。
3. 如果没有将全部数据复制到socket缓冲区中,那么remaining > 0,此时就需要将剩余数据复制到outputBuffer_中。
if (!faultError && remaining > 0) {
size_t oldLen = outputBuffer_.readableBytes();
if (oldLen + remaining >= highWaterMark_ &&
oldLen < highWaterMark_ &&
highWaterMarkCallback_)
{
loop_->queueInLoop(std::bind(highWaterMarkCallback_,
shared_from_this(),
oldLen + remaining));
}
outputBuffer_.append(static_cast<const char*>(data)+nwrote, remaining);
if (!channel_->isWriting()) {
channel_->enableWriting();
}
}
outputBuffer_加入这份剩余的内容后超过了高水位标志,那么就调用highWaterMarkCallback_。
5. 完成后,再关注POLLOUT事件,等到下次可写。
6. 当有POLLOUT事件发生时,就执行之前注册的回调函数handleWrite(),将outputBuffer_中的数据复制到socket缓冲区中:
void TcpConnection::handleWrite() {
loop_->assertInLoopThread();
if (channel_->isWriting()) {
ssize_t n = sockets::write(channel_->fd(),
outputBuffer_.peek(),
outputBuffer_.readableBytes());
if (n > 0) {
outputBuffer_.retrieve(n);
if (outputBuffer_.readableBytes() == 0) {
channel_->disableWriting();
if (writeCompleteCallback_) {
loop_->queueInLoop(std::bind(writeCompleteCallback_, shared_from_this()));
}
if (state_ == kDisconnecting) { shutdownInLoop(); }
}
}
}
// 其余省略
}
n>0,那么就需要对outputBuffer的readerIndex_进行相应的操作。
+ 如果数据全部读取完毕,那么readable()==0。此时需要取消关注POLLOUT事件,以防止busy loop现象:因为此时缓冲区为空,又是EPOLL_LT触发模式,那么会一直触发POLLOUT,直到写满缓冲区,即busy loop。具体原因可以参考I/O复用。
+ 如果已经在吃之前按已经调用shutdown使得状态为kDisconnecting,那么就直接调用shutdownInLoop()关闭连接。
read¶
调用的是handleRead(),如inputBuffer_.readFd(...)所示,将数据接受到inputBuffer_中:
void TcpConnection::handleRead(Timestamp receiveTime) {
loop_->assertInLoopThread();
ssize_t n = inputBuffer_.readFd(channel_->fd(), &savedErrno);
if (n > 0) { messageCallback_(...); }
else if (n == 0) { /** 关闭操作*/ }
else { /** 错误处理*/ }
}
shutdown¶
当应用程序需向关闭连接时,但是有可能正在处于发送数据的过程中,outputBuffer_中还有数据,不能直接调用shutdown,而是应该等数据都发送完毕,即outputBuffer_都清空了再关闭连接。
void TcpConnection::shutdown() {
// FIXME: use compare and swap
if (state_ == kConnected) {
setState(kDisconnecting);
loop_->runInLoop(std::bind(&TcpConnection::shutdownInLoop, this));
}
}
shutdown()只是将连接状态设置为kDisconnecting,并没有关闭,真正的关闭需要等到不再关注POLLOUT事件:
void TcpConnection::shutdownInLoop() {
loop_->assertInLoopThread();
if (!channel_->isWriting()) {
// we are not writing
socket_->shutdownWrite();
}
}
shutdownWrite()即当channel_不再关注POLLOUT事件了,那么就可以安全地关闭了。