基础知识¶
-
线程 线程对象 的生命周期与 **线程**的生命周期不同。
-
编程风格
- 面向对象编程风格 虚函数具有回调功能
- 基于对象的编程风格 使用
boost::bind
等,传入回调函数
Muduo
推荐的是 one loop per thread + threadpool
,就是配置多个线程,每个线程跑一个 eventloop
,一个线程只负责接受连接并分发给其他线程,之后该连接的所有 i/o
操作 都由该线程处理,threadpool
可以用来处理耗时长的任务。当然 muduo
是个通用的网络库,肯定也支持其他模式,比如设置只用 1
个线程且不用 threadpool
处理逻辑就是 Redis
那种 (其实 Redis
应该是 one loop one thread + threadpool
,它有多个 bio
线程来处理任务);设置 SO_REUSEPORT
起多个进程就是 Nginx
那种。
Reactor¶
Reactor
主要涉及下面几个类:
Channel
:封装I/O
事件和回调,不拥有fd
,可以代表多种实体:listening fd
、timer fd
、event fd
等。Poller
:I/O Multiplexing
的基类,封装底层的系统调用(poll(2)
和epoll(2)
)。根据Channel
更新事件,并返回活跃的Channel
。Eventloop
:整合Channel
和Poller
,提供更高层的接口,如定时器。
Channel¶
因为有多种 I/O Multiplexing
的系统调用,所以用 Channel
封装事件并保存对应的 callback
传递给 Poller
,类似 Redis
的 aeFileEvent
:
class Channel : noncopyable
{
public:
typedef std::function<void()> EventCallback;
typedef std::function<void(Timestamp)> ReadEventCallback;
Channel(EventLoop* loop, int fd);
~Channel();
void handleEvent(Timestamp receiveTime);
private:
static const int kNoneEvent;
static const int kReadEvent;
static const int kWriteEvent;
EventLoop* loop_;
const int fd_;
int events_;
int revents_; // it's the received event types of epoll or poll
int index_; // used by Poller.
ReadEventCallback readCallback_;
EventCallback writeCallback_;
EventCallback closeCallback_;
EventCallback errorCallback_;
};
events_
是 Channel
关心的事件,Poller
根据这个来设置。revents_
是 Poller
返回的已就绪的事件,handleEvent()
会调用相应的 callback
来处理。
所有需要由 EventLoop
处理的如 Acceptor
、TcpConnection
都有 Channel
成员并设置 callback
注册到 EventLoop
中。Muduo
的 callback
基本都是 member function
, 用 std::bind()
绑定 this
指针来注册,一些网络库会采用继承接口类来实现回调的注册。
Poller¶
Poller
是 I/O Multiplexing
的抽象基类,Muduo
支持 poll(PollPoller)
和 epoll(EPollPoller)
,使用 level-trigger
,在功能上只封装了底层的系统调用,不做多余的工作:
class Poller : noncopyable
{
public:
typedef std::vector<Channel*> ChannelList;
Poller(EventLoop* loop);
virtual ~Poller();
virtual Timestamp poll(int timeoutMs, ChannelList* activeChannels) = 0;
virtual void updateChannel(Channel* channel) = 0;
virtual void removeChannel(Channel* channel) = 0;
virtual bool hasChannel(Channel* channel) const;
protected:
typedef std::map<int, Channel*> ChannelMap;
ChannelMap channels_;
private:
EventLoop* ownerLoop_;
};
我只关注 epoll(2)
的,有 2
点值得一提:
- 因为
epoll(2)
是记录关注的fd
和事件的,所以不需要重复传递兴趣列表,但是区分了EPOLL_CTL_ADD
和EPOLL_CTL_MOD
,所以Channel
用index_
记录了状态是要add
还是mod
。 而Redis
是用数组记录了所有注册的aeFileEvent
,查找数组即可知道该add
还是mod
。 epoll_wait()
要传递数组参数用于返回就绪的事件。Muduo
采用的是动态增长的数组,默认大小是16
,若这次用完了,就会增大1
倍。而Redis
是固定大小的,因为有maxclients
可以知道最多有多少事件。
EventLoop¶
EventLoop
整合了 Channel
和 Poller
提供更方便的接口来使用,主要功能为:处理 Channel
、处理定时事件、处理任务事件。主要接口如下:
class EventLoop : noncopyable
{
public:
// EventLoop
void loop();
void quit();
// 任务事件
void wakeup();
void runInLoop(Functor cb);
void queueInLoop(Functor cb);
// 定时器
TimerId runAt(Timestamp time, TimerCallback cb);
TimerId runAfter(double delay, TimerCallback cb);
TimerId runEvery(double interval, TimerCallback cb);
void cancel(TimerId timerId);
// 处理 Channel
void updateChannel(Channel* channel);
void removeChannel(Channel* channel);
bool hasChannel(Channel* channel);
};
EventLoop
的用法是先注册 Channel
,如 listening fd
,然后调用 loop()
一直循环处理各种任务,在 loop
中可以注册新的 Channel
、定时事件和任务事件:
void EventLoop::loop()
{
while (!quit_)
{
activeChannels_.clear();
pollReturnTime_ = poller_->poll(kPollTimeMs, &activeChannels_);
for (Channel* channel : activeChannels_)
{
currentActiveChannel_ = channel;
currentActiveChannel_->handleEvent(pollReturnTime_);
}
doPendingFunctors();
}
}
先来说任务事件,使用场景主要是其他线程想要在 EventLoop
线程做些工作,比如 EventLoop
线程把耗时的任务放到 threadpool
中执行,完成后通知 EventLoop
线程获取结果。 最常见的实现方式是用 pipe(2)
:EventLoop
注册读事件,其他线程把结果保存在队列中,然后写 pipe
,EventLoop
线程就会唤醒处理。Muduo
也是类似实现,只是用 eventfd(2)
来唤醒, 具体用法就看 man page
。
再来说定时器。传统的实现为设置 Poller
的超时时间为最近的定时任务,然后每次 Poller
返回时检查一下定时器,定时器会按超时时间排序,如用最小堆等。Muduo
的 TimerQueue
用于管理定时器,用 std::set
保存 定时器,因为 Timestamp
可能相同,所以保存的是 std::pair<Timestamp, Timer*>
,用 timerfd
来触发定时器任务的执行,使得实现更加一致。
One Loop Per Thread¶
我觉得 Muduo
实现上最大的优点是将 EventLoop
和创建线程绑定在一起,所有类都和所属的 EventLoop
绑定,通过 EventLoop
提供的接口实现线程间交互, 使得从 one loop one thread
到 one loop per thread
的实现非常方便。
每个线程都会用 __thread
保存自己的 tid
,是用 syscall(SYS_gettid)
获取的,好处为(书上列的,记录一下):
- 类型是
pid_t
,通常是小整数。 - 在
Linux
中,直接表示内核的任务调度id
,可以在/proc/tid
或/proc/pid/task/tid
找到。top(1)
之类的工具也是显示tid
。 - 任何时刻都是全局唯一的。
0
是非法值。
不用 pthread_self()
的原因是:
pthread_t
的类型未知,也就无法比较大小,或作为管理容器的Key
。pthread_t
只在进程内有意义,只能保证同一进程内,同一时刻的各个线程的id
不同,不能保证同一进程先后多个线程具有不同的id
。
EventLoop
在构造时会记录创建线程的 tid
,所有可能跨线程的操作都会用任务事件 runInLoop
传递,非常方便:
bool isInLoopThread() const { return threadId_ == CurrentThread::tid(); }
void EventLoop::runInLoop(Functor cb)
{
if (isInLoopThread())
{
cb();
}
else
{
queueInLoop(std::move(cb));
}
}
EventLoopThread
用于创建 EventLoop
线程,要注意的是避免在构造函数中创建线程并暴露 this
指针,因为可能新线程使用时还未构造完成,Muduo
是主动调用 start
才会创建线程。因为 EventLoop
是线程间 交互的媒介,所以线程启动后会返回对应的指针。
EventLoopThreadPool
会创建固定个数的 EventLoopThread
,并支持 round-robin
和 hash-index
的返回线程对应的 EventLoop
,配合 runInLoop
即可实现任务的分发,如绑定 TcpConnection
。
TcpServer¶
配合上面的基础设施再实现 TCP
相关的逻辑就可以实现支持多种模型的 TCP
网络框架了,主要包括:
- 连接的管理,包括创建、关闭等。
- 数据的收发。
Acceptor¶
Acceptor
用于接受连接并调用传入的 callback
创建连接:
typedef std::function<void (int sockfd, const InetAddress&)> NewConnectionCallback;
Acceptor
在构造时预留了一个 idleFd_(::open("/dev/null", O_RDONLY | O_CLOEXEC))
,当 fd
耗尽时,关闭 idleFd_
再接收客户端连接立刻关闭,对端就可以立刻知道连接被关闭,关闭前也可以发送错误信息给对端, 之后重新打开 idleFd_
。
TcpConnection¶
Muduo
把每个 Tcp
连接都封装为 TcpConnection
,通过 shared_ptr
暴露给用户使用,主要接口如下:
class TcpConnection : noncopyable,
public std::enable_shared_from_this<TcpConnection>
{
public:
// 发送数据
void send(const void* message, int len);
void send(const StringPiece& message);
void send(Buffer* message); // this one will swap data
// 设置 context
void setContext(const boost::any& context)
{ context_ = context; }
const boost::any& getContext() const
{ return context_; }
boost::any* getMutableContext()
{ return &context_; }
// 设置 callback
// 连接建立后调用
typedef std::function<void (const TcpConnectionPtr&)> ConnectionCallback;
void setConnectionCallback(const ConnectionCallback& cb)
{ connectionCallback_ = cb; }
// 收到消息后调用
typedef std::function<void (const TcpConnectionPtr&,
Buffer*,
Timestamp)> MessageCallback;
void setMessageCallback(const MessageCallback& cb)
{ messageCallback_ = cb; }
// 消息全部发完后调用
typedef std::function<void (const TcpConnectionPtr&)> WriteCompleteCallback;
void setWriteCompleteCallback(const WriteCompleteCallback& cb)
{ writeCompleteCallback_ = cb; }
// 当发送缓冲区积压的数据超出 highWaterMark 时调用
typedef std::function<void (const TcpConnectionPtr&, size_t)> HighWaterMarkCallback;
void setHighWaterMarkCallback(const HighWaterMarkCallback& cb, size_t highWaterMark)
{ highWaterMarkCallback_ = cb; highWaterMark_ = highWaterMark; }
};
Muduo
使用 Buffer
保存数据,结构如下图:
Buffer
可读可写,用 readIndex
和 writeIndex
区分范围,不是循环队列,当空间不够用时会增大 Buffer
调整 index
。 在创建时预留了 8
字节的 prependable bytes
,用于高效的实现 length + payload
格式。不过在当前的实现中 Buffer
不会自动缩小,要主动调 shrink
,因为经常增大或缩小,数据拷贝会很频繁,许多实现 会当 Buffer
增大到某个阈值时,之后不再扩大,而是用 list
追加新的 Buffer
,Redis
就是类似实现。
TcpConnection
读响应时比较特别,使用 readv(2)
读取数据:一个是 TcpConnection
的 Buffer
;一个是栈上的 char extrabuf[65536]
。猜测因为是 level-trigger
所以不需要一次性读完所有数据(避免阻塞其他连接的请求), 但如果一次读的太少就要经过多次 Poller
返回,而又想避免数据拷贝,比如先读到栈上的数组,然后拷贝到 Buffer
,但是如果每个 TcpConnection
的 Buffer
都默认很大,又太浪费内存,所以才这么实现的。
要发送数据时调用 send
,把数据追加到 Buffer
中,这个操作是线程安全的,通过 runInLoop
传递给 EventLoop
线程,很方便。Muduo
也是在第一次发送时直接 write
,还有未写完的才会注册写事件,减少 Poller
系统调用。
还有些其他功能,如:
- 有多个
callback
在对应时机调用。 - 用
any
保存任意类型的context
方便使用。
ThreadPool¶
这里的 ThreadPool
是用于执行任务,至于 1
个线程接收连接,分发给多个 i/o
线程是通过 EventLoopThreadPool
和 Eventloop.runInLoop
实现的。ThreadPool
的实现也很简单,从成员变量就可以看出来实现:
class ThreadPool : noncopyable
{
public:
typedef std::function<void ()> Task;
private:
mutable MutexLock mutex_;
Condition notEmpty_ GUARDED_BY(mutex_);
Condition notFull_ GUARDED_BY(mutex_);
std::vector<std::unique_ptr<muduo::Thread>> threads_;
std::deque<Task> queue_ GUARDED_BY(mutex_);
size_t maxQueueSize_;
};
TcpClient¶
TcpClient
负责建立连接和处理 I/O
,它不拥有 EventLoop
,每个 TcpClient
只负责一个 TcpConnection
。其实 client
和 server
很相似,区别在于一个是被动的 listen
,一个是主动的 connect
。 TcpClient
和 TcpServer
都是由外部传入的 EventLoop
构造,可以使多个 TcpClient
共用一个 EventLoop
,再搭配上 EventLoopThreadPool + round-robin
就可以实现类似 TcpServer
的多线程 client
。
Connector¶
Connector
和 Acceptor
类似,只负责建立连接。使用 nonblocking connect
,当返回可写事件时要检查 socket error
,并支持重试,重试会有动态增加的延迟时间。因为如果服务端突然挂掉,大量连接同时重试 会影响服务质量并造成拥塞。