线程¶
线程池¶
1. 有界·线程安全队列 BoundedBlockingQueue¶
template<typename T>
class BoundedBlockingQueue : boost::noncopyable {
public:
explicit
BoundedBlockingQueue(int maxSize) ;
void put(const T& x) ;
T take() ;
bool empty() const ;
bool full() const ;
size_t size() const ;
size_t capacity() const;
private:
mutable MutexLock mutex_;
Condition notEmpty_;
Condition notFull_;
boost::circular_buffer<T> queue_;
};
条件量
:notEmpty_
,notFull_
。
+ notEmpty_
:在pull
函数完成任务添加后,通知执行任务notEmpty_.notify()
。
+ notFull_
:在take()
函数执行完任务后,通知线程可以继续添加任务了notFull_.notify()
。如图:
这个类实现了线程安全的queue
,内部封装了Condition
、Mutex
,使得完成任务可以内部有条不紊的进行。
2. 无界·线程安全队列 BlockingQueue¶
由于无界,因此每次put
添加数据时候不必考虑队列缓冲区是否满了。其余的大体和上面的有界线程安全队列一致。
template<typename T>
class BlockingQueue : boost::noncopyable {
public:
explicit
BlockingQueue(int maxSize) ;
void put(const T& x) ;
T take() ;
bool empty() const ;
size_t size() const ;
size_t capacity() const;
private:
mutable MutexLock mutex_;
Condition notEmpty_;
std::deque<T> queue_;
};
3. 线程池 ThreadPool¶
class ThreadPool : boost::noncopyable {
public:
typedef std::function<void ()> Task;
explicit
ThreadPool(const string& name = string());
~ThreadPool();
//启动线程数
void start(int numThreads);
void stop();
void run(const Task& f);
private:
void runInThread();
Task take();
MutexLock mutex_;
Condition cond_;
string name_;
boost::ptr_vector<muduo::Thread> threads_;
std::deque<Task> queue_; // 任务队列
bool running_;
};
+ 执行任务
start
启动numThreads
个数量的线程,并且创建的同时,执行runThreads
函数,runThreads
内部是通过take()
来获取任务,如果此时任务队列为空,就会一直wait
,等待任务的到来。
+ 添加任务
run
函数作用是将任务添加到队列中,然后notify
通知线程来执行任务。
4. 线程安全的单例类 Singleton¶
-
pthread_once
#include<pthread> int pthread_once(pthread_once_t *once_control, void (*init_routine)(void)); pthread_once_t once_control = PTHREAD_ONCE_INIT;
可以注册一个函数,在**调用进程终止时**调用。对于单例类可以注册一个这个函数`pthread_once`可以使得`init_routine`函数仅仅执行一次,而且是线程安全的,因此可以用来实现线程安全的单例类。 + `atexit` ```c #include <stdlib.h> int atexit(void (*function)(void));
destroy
函数,在进程退出时析构唯一的对象。
5. 线程特定数据 ThreadLocal¶
线程特定数据类型需要借助一组 Linux 函数。
#include <pthread.h>
int pthread_key_create(pthread_key_t *key, void (*destructor)(void*));
int pthread_setspecific(pthread_key_t key, const void *value);
void *pthread_getspecific(pthread_key_t key);
对其进行封装得到 ThreadLocal<T>
,使得每个线程都有类型 T 变量,但是变量具体的值在不同的线程不同。对于POD
数据类型,可以用__thread
实现,对于非POD
数据类型,就需要借组**线程特定数据类型**实现一个类ThreadLocal
。
template<typename T>
class ThreadLocal : boost::noncopyable {
public:
ThreadLocal();
~ThreadLocal();
T& value();
private:
static void destructor(void * arg);
pthread_key_t pkey_;
};
ThreadLocal<T>
本质上只是实现类型 T
的一个实例的线程局部化,ThreadLocal<T>
对象应该和 T
一样使用。函数 value()
就是为实现创建T
一个实例 ThreadLocal<T> obj
。在每个线程中,要想使用obj
都需要调用value()
函数,那么每次都new
了一个值,存储在pkey
处。
6. ThreadLocalSingleton¶
ThreadLocalSingleton
- 另一个种实现方式
Singleton<ThreadLocal<T>>
,如此,整个进程只有类型ThreadLocal<T>
的一个实例对象,但是每个线程类型T
的对象不同,也是只有一个。这是因为:
由于只有一个T& ThreadLocal::value() { T* perThreadValue = static_cast<T*>(pthread_getspecific(pkey_)); if (!perThreadValue) { T* newObj = new T(); pthread_setspecific(pkey_, newObj); perThreadValue = newObj; } return *perThreadValue; }
ThreadLocal<T>
的一个实例对象,假设是to
,那么to
的pkey
在整个进程共享,但是由线程特定数据类型可知,键相同,值可以不同。 ThreadLocalSingleton<T>
template<typename T> class ThreadLocalSingleton : boost::noncopyable { public: static T& instance(); static T* pointer(); private: static void destructor(void* obj); class Deleter { public: pthread_key_t pkey_; ... }; static __thread T* t_value_; static Deleter deleter_; };
instance()
用于生成实例,pointer()
返回实例的地址。- 类
Deleter
-
是用来析构实例,具体的析构函数由
destructor()
函数实现,这个主要是为了方便实例自动析构,不用手工干预。ThreadLocalSingleton
对象 生命期结束时,其static
成员变量deleter
也会析构,同时会调用destructor()
析构类型T
的对象。 + 在Deleter
种用线程特定数据类型实现ThreadLocal
-