跳转至

Buffer

Buffer

    class Buffer : public muduo::copyable {
    private:
        std::vector<char> buffer_;
        size_t readerIndex_;
        size_t writerIndex_;

    };
Buffer 有三个数据成员:buffer_readerIndex_writerIndex_,内部示意图:
    /// @code
    /// +--------------------+------------------+------------------+
    /// | dprependable bytes |  readable bytes  |  writable bytes  |
    /// |                    |     (CONTENT)    |                  |
    /// +--------------------+------------------+------------------+
    /// |                    |                  |                  |
    /// 0      <=      readerIndex   <=   writerIndex    <=     size
    /// @endcode
+ 成员变量 + prependable bytes:这是为了以预留出空间这样的极小的代价在buffer_头部插入数据相关信息。 + readable bytes: 可以读取字节数 → 用户从这里读取,socket缓冲区向这里写入 + writable bytes: 可以读写入字节数 → 用户向这里写入,向socket缓冲区发送 + buffer_字节序列
buffer_中存储的是网络字节序列,即大端字节序列。因此向buffer_写入字节需要转为为网络字节序列,从这里读取数据需要转换为主机字节序列。

用户端

  • 读取
    读取的函数readIntxxx:
        int64_t readInt64() {
            int64_t result = peekInt64();
            retrieveInt64();
            return result;
        }
    
    在内部调用了peekInt64retrieveInt64,前面的一个函数是操作buffer_,后面一个函数是操作readerIndex。网络自己序列的转换由peekInt64完成:
        int64_t peekInt64() const {
            assert(readableBytes() >= sizeof(int64_t));
            int64_t be64 = 0;
            ::memcpy(&be64, peek(), sizeof be64);   // 从读取,
            return sockets::networkToHost64(be64);  // 再转换字节序列
        }
    
  • 写入
    写入使用的函数是appendxxx:
        void appendInt64(int64_t x) {
            int64_t be64 = sockets::hostToNetwork64(x); // 先转换为网络字节序列
            append(&be64, sizeof(int64_t));             // 再写入
        }
    
    内部需要append函数,以及hasWritten,前面用于写入数据到buffer_中,后者用于操作writerIndex_,因为是向buffer_中增加内容,也因此会存在内存不足而需要扩容操作:
            void append(const char* /*restrict*/ data, size_t len) {
            // 要确保 ( size() - writerIndex ) > len 
            ensureWritableBytes(len);
            // 将 [data, data_len) --> [writerIndex, writerIndex + len)
            std::copy(data, data+len, beginWrite());    
            // 操作 writerIndex_
            hasWritten(len);   
        }
    

socket

  • socket 中读取数据
    使用的是Buffer::readFd(int fd, int* savedErrno)。因为可能存在粘包问题,因此将所有的数据都读取到缓冲区中,并且自己设置一个二级缓冲区,尽可能一次性的将所有数据从socket中读取出来。等完成,再写入buffer_
        ssize_t Buffer::readFd(int fd, int* savedErrno) {
            char extrabuf[65536];   // 64k 字节数 
            struct iovec vec[2];
            const size_t writable = writableBytes();
            // 第一级缓冲区是 buffer_
            vec[0].iov_base = begin()+writerIndex_;
            vec[0].iov_len = writable;
            // 第二级缓冲区是 自定义缓冲区
            vec[1].iov_base = extrabuf;
            vec[1].iov_len = sizeof extrabuf;
            // 使用几个缓冲区:muduo中使用的是2个。因为 writable <= 1024 
            const int iovcnt = (writable < sizeof extrabuf) ? 2 : 1;
            const ssize_t n = sockets::readv(fd, vec, iovcnt);
            // 读取完,下面需要把读取到的字节数写入到buffer_
            if (n < 0) {
                *savedErrno = errno;
            }
            // 如果没用到第二个缓冲区
            else if (implicit_cast<size_t>(n) <= writable) {
                writerIndex_ += n;
            }
            // 读取到的字节数超过 buffer_ 大小
            // 就需要重新分配内存,扩容
            else {
                writerIndex_ = buffer_.size();
                append(extrabuf, n - writable);
            }
            return n;
        }
    
  • 发送数到socket缓冲区,在TcpConnection类中实现。

TcpConnection

class TcpConnection : noncopyable,
                      public std::enable_shared_from_this<TcpConnection> {
private:
    Buffer inputBuffer_;        // 用户从这里读取,socket向这里写
    Buffer outputBuffer_;       // 用户向这里写入,从这里向socket缓冲区里写
    size_t highWaterMark_;

    WriteCompleteCallback  writeCompleteCallback_;  
    HighWaterMarkCallback  highWaterMarkCallback_;
    .
    .
    .
};
muduo库为每个TcpConnection各自分配了一个inputBuffer_outputBuffer_,这些缓冲区每个TcpConnection独有不共享,因此Buffer不需要加锁设置为线程安全的。为了使得数据通信更加高效,设置了两个回调函数: + writeCompleteCallback_:用于通知用户层已经将utputBuffer_中的数据复制到socket缓冲区中。
+ 适合大流量数据发送,低流量不需要关注。 + 场景:不断地生成数据,然后发送conn->send(...),如果对方接受不及时,收到滑动窗口的控制,内核缓冲区不足,这个时候就会将数据添加到引用层发送缓冲区outputBuffer_,可能会使得outputBuffer_不断胀大。 + 解决:就是关注writeCompleteCallback_,当write操作outputBuffer_中的所有数据都复制到socket缓冲区中,通知用户可以发送数据,以调整发送频率。

  • highWaterMarkCallback_:用于标记outputBuffer字节数,当超过highWaterMark_说明对端接受不及时,那么此时发送端调用highWaterMarkCallback_来应对这种情况。

  • 综上
    上述两个函数的回调都可以视为两个方向的标志:

    • writeCompleteCallback_相当于是outputBuffer_低水位标志
    • highWaterMarkCallback_相当于是outputBuffer_高水位标志

send

    void TcpConnection::send(Buffer* buf) {
        if (state_ == kConnected) {
            if (loop_->isInLoopThread()) {
                sendInLoop(buf->peek(), buf->readableBytes());
                buf->retrieveAll();
            }
            else {
                void (TcpConnection::*fp)(const StringPiece& message) = &TcpConnection::sendInLoop;
                loop_->runInLoop(std::bind(fp, this, buf->retrieveAllAsString()));
            }
        }
    }
send函数只能在TcpConnection所属的EventLoop中完成,send函数将readableByte()的字节全部通过socket发送出去。
    void TcpConnection::sendInLoop(const void* data, size_t len) {
        loop_->assertInLoopThread();
        ...
        if (!channel_->isWriting() && outputBuffer_.readableBytes() == 0) {
            nwrote = sockets::write(channel_->fd(), data, len);
            if (nwrote >= 0) {
                remaining = len - nwrote;
                if (remaining == 0 && writeCompleteCallback_) {
                    loop_->queueInLoop(std::bind(writeCompleteCallback_, shared_from_this()));
                }
            }
        }
    }
    ...
sendInLoop中发送数据的核心部分如上图所示,当发送数据时候outputBuffer_中的字节数为0, 1. 先通过sockets::write将全部数据data尽可能多的复制到socket缓冲区中 2. 如果数据全部复制到socket缓冲区中并且关注了复制完成回调函数:remaining == 0 && writeCompleteCallback_,就执行该回调函数通知用户继续发送数据。 3. 如果没有将全部数据复制到socket缓冲区中,那么remaining > 0,此时就需要将剩余数据复制到outputBuffer_中。
    if (!faultError && remaining > 0) {
        size_t oldLen = outputBuffer_.readableBytes();
        if (oldLen + remaining >= highWaterMark_ &&  
            oldLen < highWaterMark_ && 
            highWaterMarkCallback_) 
        {
            loop_->queueInLoop(std::bind(highWaterMarkCallback_, 
                               shared_from_this(), 
                               oldLen + remaining));
        } 

        outputBuffer_.append(static_cast<const char*>(data)+nwrote, remaining);
        if (!channel_->isWriting()) {
            channel_->enableWriting();
        }
    }
4. 如果outputBuffer_加入这份剩余的内容后超过了高水位标志,那么就调用highWaterMarkCallback_。 5. 完成后,再关注POLLOUT事件,等到下次可写。 6. 当有POLLOUT事件发生时,就执行之前注册的回调函数handleWrite(),将outputBuffer_中的数据复制到socket缓冲区中:
    void TcpConnection::handleWrite() {
        loop_->assertInLoopThread();
        if (channel_->isWriting()) {
            ssize_t n = sockets::write(channel_->fd(), 
                                       outputBuffer_.peek(), 
                                       outputBuffer_.readableBytes());
            if (n > 0) {
                outputBuffer_.retrieve(n);
                if (outputBuffer_.readableBytes() == 0) {
                    channel_->disableWriting();
                    if (writeCompleteCallback_) {
                        loop_->queueInLoop(std::bind(writeCompleteCallback_, shared_from_this()));
                    }
                    if (state_ == kDisconnecting) { shutdownInLoop(); }
                }   
            }   
        }
        // 其余省略
    }
+ 如果复制成功即 n>0,那么就需要对outputBufferreaderIndex_进行相应的操作。 + 如果数据全部读取完毕,那么readable()==0。此时需要取消关注POLLOUT事件,以防止busy loop现象:因为此时缓冲区为空,又是EPOLL_LT触发模式,那么会一直触发POLLOUT,直到写满缓冲区,即busy loop具体原因可以参考I/O复用。 + 如果已经在吃之前按已经调用shutdown使得状态为kDisconnecting,那么就直接调用shutdownInLoop()关闭连接。

read

调用的是handleRead(),如inputBuffer_.readFd(...)所示,将数据接受到inputBuffer_中:

    void TcpConnection::handleRead(Timestamp receiveTime) {
        loop_->assertInLoopThread();

        ssize_t n = inputBuffer_.readFd(channel_->fd(), &savedErrno);

        if (n > 0) { messageCallback_(...); }
        else if (n == 0) {  /** 关闭操作*/  }
        else { /** 错误处理*/  }
}

shutdown

当应用程序需向关闭连接时,但是有可能正在处于发送数据的过程中,outputBuffer_中还有数据,不能直接调用shutdown,而是应该等数据都发送完毕,即outputBuffer_都清空了再关闭连接。

void TcpConnection::shutdown() {
    // FIXME: use compare and swap
    if (state_ == kConnected) {
        setState(kDisconnecting);
        loop_->runInLoop(std::bind(&TcpConnection::shutdownInLoop, this));
    }
}
shutdown()只是将连接状态设置为kDisconnecting,并没有关闭,真正的关闭需要等到不再关注POLLOUT事件:
    void TcpConnection::shutdownInLoop() {
        loop_->assertInLoopThread();
        if (!channel_->isWriting()) {
            // we are not writing
            socket_->shutdownWrite();
        }
    }
可以调用shutdownWrite()即当channel_不再关注POLLOUT事件了,那么就可以安全地关闭了。