Boost.Thread
线程:
Thread
thread_group
线程局部存储:
thread_specific_ptr
锁:
lock_guard
unique_lock
scoped_lock
shared_lock
如果只是为了保证数据同步,那么lock_guard
完全够用;
如果除了同步,还需要使用condition进行阻塞时,那么就需要用unique_lock
。
boost还要一个boost::mutex::scoped_lock
,这个是boost::unique_lock<boost::mutex>
的typedef,在C++11中已经禁用。
独占锁:
boost::unique_lock<T>
,其中T可以mutex中的任意一种。
如果T为mutex,那么boost::unique_lock<boost::mutex>
,构造与析构时则分别自动调用lock和unlock方法。
如果T为shared_mutex,那么boost::unique_lock<boost::shared_mutex>
,构造与析构时则分别调用shared_mutex
的shared_lock
和shared_unlock
方法。
注意:scoped_lock
也是独占锁,其源代码中定义如下;
typedef unique_lock<mutex> scoped_lock;
typedef unique_lock<timed_mutex> scoped_timed_lock;
共享锁:
boost::shared_lock<T>
,其中的T只能是shared_mutex
类。
当然还有其他一些锁:lock_guard
, upgrade_lock
等。
互斥:
独占式互斥量
mutex
try_mutex
timed_mutex
递归式互斥量
recursive_mutex
recursive_try_mutex
recursive_timed_mutex
共享式互斥量
shared_mutex
条件变量:
condition_variable
condition_variable_any
原子变量:
future:
packaged_task
unique_future
Promise
Promise对象可保存T类型的值,该值可被future对象读取(可能在另一个线程中),这是promise提供的同步的一种手段。
在构造promise时,promise对象可以与共享状态关联起来,这个共享状态可以存储一个T类型或者一个由std::exception派生出的类的值,并可以通过get_future来获取与promise对象关联的对象,调用该函数之后,两个对象共享相同的共享状态(shared state)。
Promise对象是异步provider,它可以在某一时刻设置共享状态的值。
Future对象可以返回共享状态的值,或者在必要的情况下阻塞调用者并等待共享状态标识变为ready,然后才能获取共享状态的值。
其他:
//保证在多线程中只允许一次
once_flag
call_once
示例:
#include <boost/thread.hpp>
#include <iostream>
#include <complex.h>
#include <stack>
#include <mutex>
#include <boost/thread/once.hpp>
#include <boost/thread/future.hpp>
#include <boost/asio.hpp>
void my_func()
{
std::cout << "detach-分离 或 join-等待 线程" << std::endl;
}
TEST(BoostThread, detachOrjoin)//简单线程,线程状态joinable、detached
{
boost::thread t(my_func);
boost::thread::yield();//当前线程放弃余下的时间片。
std::cout << t.joinable() << std::endl;
t.join();
boost::thread t1(my_func);
std::cout << t1.joinable() << std::endl;
t1.detach();
std::cout << t1.joinable() << std::endl;
}
//线程的创建需要传递给thread对象一个可调用物(函数或函数对象),它必须具
//有operator()以供线程执行。
boost::mutex io_mutex;
struct count
{
count(int id) : id(id) { }
void operator()()
{
for (int i = 0; i < 10; ++i)
{
boost::mutex::scoped_lock
lock(io_mutex);
std::cout << id << ": "
<< i << std::endl;
}
}
int id;
};
TEST(BoostThread, Typeobject)//复杂类型对象作为参数来创建线程
{
boost::thread thrd1(count(1));
boost::thread thrd2(count(2));
thrd1.join();
thrd2.join();
}
class HelloWorldStatic
{
public:
static void hello()
{
std::cout <<
"Hello world, I''m a thread!"
<< std::endl;
}
static void start()
{
boost::thread thrd(hello);
thrd.join();
}
};
TEST(BoostThread, InClassStatic)//类内部创建线程
{
HelloWorldStatic::start();//在这里start()和hello()方法都必须是static方法。
}
class HelloWorld
{
public:
void hello()
{
std::cout <<
"Hello world, I''m a thread!"
<< std::endl;
}
void start()
{
boost::function0< void> f = boost::bind(&HelloWorld::hello, this);
boost::thread thrd(f);
thrd.join();
}
};
TEST(BoostThread, InClass)//start()和hello()方法不是静态方法则采用此方法创建线程
{
HelloWorld hello;
hello.start();
}
class HelloWorldOut
{
public:
void hello(const std::string& str)
{
std::cout << str;
}
};
TEST(BoostThread, OutClass)
{
HelloWorldOut obj;
boost::thread thrd(boost::bind(&HelloWorldOut::hello, &obj, "Hello world, I''m a thread!" ) ) ;
thrd.join();
}
void func1(const int &id)
{
std::cout << "func1 id : " << id << std::endl;
}
struct MyThread
{
void operator()(const int &id)
{
std::cout << "MyThread id : " << id << std::endl;
}
void func1(const int &id)
{
std::cout << "MyThread::func1 id : " << id << std::endl;
}
};
TEST(BoostThread, Threadparameters)
{
//普通函数
boost::thread t1(func1, 11);
t1.join();
//函数对象
MyThread myThread;
boost::thread t2(myThread, 22);
t2.join();
//成员函数
boost::thread t3(&MyThread::func1, myThread, 33);
t3.join();
//临时对象
boost::thread t4(MyThread(), 44);
t4.join();
//对象引用
boost::thread t5(boost::ref(myThread), 55);
t5.join();
}
void wait(int seconds)
{
boost::this_thread::sleep(boost::posix_time::seconds(seconds));
}
void threadinterrupt()
{
try
{
for (int i = 0; i < 4; ++i)
{
wait(1);
std::cout << i<<" 是否允许中断:"<< boost::this_thread::interruption_enabled()<<" 是否被要求中断:"<< boost::this_thread::interruption_requested() << std::endl;
}
}
catch (boost::thread_interrupted&)
{
std::cout << "已经中断" << std::endl;
}
}
#define testinterruption() { std::cout << "\r\n是否允许中断:" << boost::this_thread::interruption_enabled() << " 是否被要求中断:" << boost::this_thread::interruption_requested()<< __FUNCTION__<< __LINE__ << std::endl;}
void f()
{
testinterruption();
// interruption enabled here
{
boost::this_thread::disable_interruption di;
boost::this_thread::interruption_point();
testinterruption();
// interruption disabled
{
testinterruption();
boost::this_thread::disable_interruption di2;
testinterruption();
// interruption still disabled
} // di2 destroyed, interruption state restored
// interruption still disabled
boost::this_thread::interruption_point();
testinterruption();
} // di destroyed, interruption state restored
// interruption now enabled
testinterruption();
boost::this_thread::interruption_point();
testinterruption();
}
void g()
{
testinterruption();
// interruption enabled here
{
testinterruption();
boost::this_thread::disable_interruption di;
boost::this_thread::interruption_point();
testinterruption();
// interruption disabled
{
testinterruption();
boost::this_thread::restore_interruption ri(di);
testinterruption();
// interruption now enabled
} // ri destroyed, interruption disable again
testinterruption();
boost::this_thread::interruption_point();
} // di destroyed, interruption state restored
// interruption now enabled
testinterruption();
boost::this_thread::interruption_point();
testinterruption();
}
//预定于中断点:
//boost::thread::join()
//boost::thread::timed_join()
//boost::thread::try_join_for(),
//boost::thread::try_join_until(),
//boost::condition_variable::wait()
//boost::condition_variable::timed_wait()
//boost::condition_variable::wait_for()
//boost::condition_variable::wait_until()
//boost::condition_variable_any::wait()
//boost::condition_variable_any::timed_wait()
//boost::condition_variable_any::wait_for()
//boost::condition_variable_any::wait_until()
//boost::thread::sleep()
//boost::this_thread::sleep_for()
//boost::this_thread::sleep_until()
//boost::this_thread::interruption_point()
TEST(BoostThread, interrupt)//中断
{
std::cout << "中断" << std::endl;
boost::thread t(threadinterrupt);
wait(3);
t.interrupt();
t.join();
boost::thread tf(f);
tf.interrupt();
tf.join();
boost::thread tg(g);
tg.interrupt();
tg.join();
}
TEST(BoostThread, threadid_count)
{
std::cout << boost::this_thread::get_id() << std::endl;
std::cout << boost::thread::hardware_concurrency() << std::endl;
}
void thread_group_Fun()
{
std::cout <<"当前线程ID:"<< boost::this_thread::get_id() << std::endl;
}
TEST(BoostThread, thread_group)
{
boost::thread_group grp;
boost::thread *p = new boost::thread(thread_group_Fun);
grp.add_thread(p);
std::cout << "remove_thread:" << p->get_id()<< std::endl;
grp.remove_thread(p);
grp.create_thread(thread_group_Fun);
grp.create_thread(thread_group_Fun);
grp.join_all();
}
class BankAccount {
boost::mutex mtx_; // explicit mutex declaration
int balance_;
public:
void Deposit(int amount) {
boost::lock_guard<boost::mutex> guard(mtx_);
balance_ += amount;
}
void Withdraw(int amount) {
boost::lock_guard<boost::mutex> guard(mtx_);
balance_ -= amount;
}
int GetBalance() {
boost::lock_guard<boost::mutex> guard(mtx_);
return balance_;
}
};
BankAccount JoesAccount;
void bankAgent()
{
for (int i = 10; i > 0; --i) {
JoesAccount.Deposit(500);
std::cout << "bankAgent:" << JoesAccount.GetBalance() << std::endl;
}
}
void Joe() {
for (int i = 10; i > 0; --i) {
JoesAccount.Withdraw(100);
std::cout <<"Joe:"<< JoesAccount.GetBalance() << std::endl;
}
}
TEST(BoostThread, mutex)//互斥
{
boost::thread thread1(bankAgent); // start concurrent execution of bankAgent
boost::thread thread2(Joe); // start concurrent execution of Joe
thread1.join();
thread2.join();
}
typedef boost::shared_lock<boost::shared_mutex> readLock;
typedef boost::unique_lock<boost::shared_mutex> writeLock;
boost::shared_mutex rwmutex;
std::vector<int> random_numbers;
void readOnly()
{
for (int i = 0; i < 3; ++i)
{
wait(1);
readLock rdlock(rwmutex);
std::cout << random_numbers.back() << std::endl;
}
}
void writeOnly()
{
std::srand(static_cast<unsigned int>(std::time(0)));
for (int i = 0; i < 3; ++i)
{
writeLock wtlock(rwmutex);
random_numbers.push_back(std::rand());
wait(1);
}
}
int sum = 0;
void count()
{
for (int i = 0; i < 3; ++i)
{
wait(1);
readLock rdlock(rwmutex);
sum += random_numbers.back();
}
}
TEST(BoostThread, readwrite)//读写锁
{
boost::thread t1(writeOnly);
boost::thread t2(readOnly);
boost::thread t3(count);
t1.join();
t2.join();
t3.join();
std::cout << "Sum: " << sum << std::endl;
}
boost::condition_variable_any cond;
boost::mutex rwmutexCV;
std::vector<int> random_numbersCV;
void print()
{
std::size_t next_size = 1;
for (int i = 0; i < 3; ++i)
{
boost::unique_lock<boost::mutex> lock(rwmutexCV);
while (random_numbersCV.size() != next_size)
cond.wait(rwmutexCV);
std::cout << random_numbersCV.back() << std::endl;
++next_size;
cond.notify_all();
}
}
void fill()
{
std::srand(static_cast<unsigned int>(std::time(0)));
for (int i = 0; i < 3; ++i)
{
boost::unique_lock<boost::mutex> lock(rwmutexCV);
random_numbersCV.push_back(std::rand());
cond.notify_all();
cond.wait(rwmutexCV);
}
}
TEST(BoostThread, ConditionVariables)//条件变量
{
boost::thread t1(fill);
boost::thread t2(print);
t1.join();
t2.join();
}
//生产者-消费者模式的后进先出型(std::stack)缓冲区
class Buffer
{
public:
//构造函数
Buffer(size_t n) :un_read(0), capacity(n) {}
//写入数据x
void put(int x) {
//局部域
{
boost::mutex::scoped_lock lock(mu); //锁定互斥量
while (is_full()) {
cond_put.wait(mu); //条件变量等待
}
stk.push(x); //写入数据
++un_read;
} //解锁互斥量
cond_get.notify_one(); //通知可以读取数据
}
//读取数据
void get(int *x) {
{
boost::mutex::scoped_lock lock(mu);
while (is_empty()) {
cond_get.wait(mu);
}
*x = stk.top();
stk.pop();
--un_read;
}
cond_put.notify_one();
}
private:
//判断缓冲区是否满
bool is_full() {
return un_read == capacity;
}
//判断缓冲区是否为空
bool is_empty() {
return un_read == 0;
}
private:
boost::mutex mu; //互斥量,配合条件变量使用
boost::condition_variable_any cond_put; //写入条件变量
boost::condition_variable_any cond_get; //读取条件变量
std::stack<int> stk; //缓冲区对象
int un_read;
int capacity;
};
Buffer buf(5); //定义一个缓冲区对象
boost::mutex io_mu_; //定义一个输出互斥量
//生产者,n个
void producer(int n)
{
for (int i = 0; i < n; i++) {
//输出信息
{
boost::mutex::scoped_lock lock(io_mu_);
std::cout << "put " << i << " to buffer" << std::endl;
}
buf.put(i); //写入数据
}
}
//消费者
void consumer(int n)
{
int result(0);
for (int i = 0; i < n; i++) {
{
buf.get(&result); //读取数据
boost::mutex::scoped_lock lock(io_mu_);
std::cout << "get " << result << " from buffer" << std::endl;
}
}
}
TEST(BoostThread, producer_consumer_test)
{
boost::thread t_producer(producer, 20);
boost::thread t_consumer1(consumer, 10);
boost::thread t_consumer2(consumer, 10);
t_producer.join();
t_consumer1.join();
t_consumer2.join();
}
class ThreadPool {
public:
explicit ThreadPool(size_t size) : work_(io_service_) {
for (size_t i = 0; i < size; ++i) {
workers_.create_thread(
boost::bind(&boost::asio::io_service::run, &io_service_));
}
}
~ThreadPool() {
io_service_.stop();
workers_.join_all();
}
// Add new work item to the pool.
template<class F>
void Enqueue(F f) {
io_service_.post(f);
}
private:
boost::thread_group workers_;
boost::asio::io_service io_service_;
boost::asio::io_service::work work_;
};
TEST(BoostThread, ThreadPool)
{
// Create a thread pool of 4 worker threads.
ThreadPool pool(4);
// Queue a bunch of work items.
for (int i = 0; i < 8; ++i) {
pool.Enqueue([i] {
std::cout << "hello " << i << std::endl;
boost::this_thread::sleep(boost::posix_time::seconds(1));
std::cout << "world " << i << std::endl;
});
}
}
void init_number_generator()
{
static boost::thread_specific_ptr<bool> tls;
if (!tls.get())
tls.reset(new bool(false));
if (!*tls)
{
*tls = true;
std::srand(static_cast<unsigned int>(std::time(0)));
}
}
boost::mutex mutex;
void random_number_generator()
{
init_number_generator();
int i = std::rand();
boost::lock_guard<boost::mutex> lock(mutex);
std::cout << i << std::endl;
}
TEST(BoostThread, thread_specific_ptr)//线程局部存储
{
boost::thread t[3];
for (int i = 0; i < 3; ++i)
t[i] = boost::thread(random_number_generator);
for (int i = 0; i < 3; ++i)
t[i].join();
}
std::once_flag flag1, flag2;
void simple_do_once()
{
std::call_once(flag1, []() { std::cout << "Simple example: called once\n"; });
}
void may_throw_function(bool do_throw)
{
if (do_throw) {
std::cout << "throw: call_once will retry\n"; // 这会出现多于一次
throw std::exception();
}
std::cout << "Didn't throw, call_once will not attempt again\n"; // 保证一次
}
void do_once(bool do_throw)
{
try {
std::call_once(flag2, may_throw_function, do_throw);
}
catch (...) {
}
}
TEST(StdThread, std_call_once)
{
std::thread st1(simple_do_once);
std::thread st2(simple_do_once);
std::thread st3(simple_do_once);
std::thread st4(simple_do_once);
st1.join();
st2.join();
st3.join();
st4.join();
std::thread t1(do_once, true);
std::thread t2(do_once, true);
std::thread t3(do_once, false);
std::thread t4(do_once, true);
t1.join();
t2.join();
t3.join();
t4.join();
}
// Some sort of connection class that should only be initialized once
struct Conn {
static void init() { ++i_; }
static boost::once_flag init_;
static int i_;
// ...
};
int Conn::i_ = 0;
boost::once_flag Conn::init_ = BOOST_ONCE_INIT;
void worker() {
boost::call_once(Conn::init, Conn::init_);
// Do the real work...
}
Conn c; // You probably don't want to use a global, so see the
// next Recipe
TEST(BoostThread, Boost_call_once)
{
boost::thread_group grp;
for (int i = 0; i < 100; ++i)
grp.create_thread(worker);
grp.join_all();
std::cout << c.i_ << '\n';// c.i_ = 1
}
int fab(int n)
{
if (n == 0 || n == 1)
{
return 1;
}
return fab(n - 1) + fab(n - 2);
}
TEST(BoostThread, futuretest)
{
// 声明packaged_task对象,用模板参数指明返回值类型
// packaged_task只接受无参函数,因此需要使用bind
boost::packaged_task<int> pt(boost::bind(fab, 10));
// 声明unique_future对象,接受packaged_task的future值
// 同样要用模板参数指明返回值类型
boost::unique_future<int> uf = pt.get_future();
// 启动线程计算,必须使用boost::move()来转移package_task对象
// 因为packaged_task是不可拷贝的
boost::thread(boost::move(pt));
uf.wait(); // unique_future等待计算结果
assert(uf.is_ready() && uf.has_value());
std::cout << uf.get()<<std::endl; // 输出计算结果99
}
int calculate_the_answer_to_life_the_universe_and_everything()
{
return 42;
}
void invoke_lazy_task(boost::packaged_task<int>& task)
{
try
{
task();
}
catch (boost::task_already_started&)
{
}
}
//shared_future与unique_future
TEST(BoostThread, BoostFuture)
{
{
boost::packaged_task<int> pt(calculate_the_answer_to_life_the_universe_and_everything);
boost::unique_future<int> fi = pt.get_future();
boost::thread task(boost::move(pt)); // launch task on a thread
fi.wait(); // wait for it to finish
ASSERT_TRUE(fi.is_ready());
ASSERT_TRUE(fi.has_value());
ASSERT_TRUE(!fi.has_exception());
ASSERT_TRUE(fi.get_state() == boost::future_state::ready);
ASSERT_TRUE(fi.get() == 42);
}
{
boost::promise<int> pi;
boost::unique_future<int> fi;
fi = pi.get_future();
pi.set_value(42);
ASSERT_TRUE(fi.is_ready());
ASSERT_TRUE(fi.has_value());
ASSERT_TRUE(!fi.has_exception());
ASSERT_TRUE(fi.get_state() == boost::future_state::ready);
ASSERT_TRUE(fi.get() == 42);
}
{
boost::packaged_task<int> task(calculate_the_answer_to_life_the_universe_and_everything);
task.set_wait_callback(invoke_lazy_task);
boost::unique_future<int> f(task.get_future());
ASSERT_TRUE(f.get() == 42);
}
}
Win32 API
内核对象的线程同步则主要由事件
、等待定时器
、信号量
以及信号灯
等内核对象构成。由于这种同步机制使用了内核对象,使用时必须将线程从用户模式切换到内核模式,而这种转换一般要耗费近千个CPU周期,因此同步速度较慢,但在适用性上却要远优于用户模式的线程同步方式。
在WIN32中,同步机制主要有以下几种:
(1)事件(Event);
(2)信号量(semaphore);
(3)互斥量(mutex);
(4)临界区(Critical section)。
下面简单对比一下Windows中的Mutex、Semaphore、Event和Critical Section。
Mutext,也叫做Mutant。只允许一个线程进入,这个进入的线程被认为是Mutex的所有者。所有者可以重入。Mutex的所有者需要在操作完成后释放这个Mutex,如果没有释放就结束了,操作系统会释放这个Mutex,但是会设置成Abandoned Mutex。
Semaphore。维护了一个计数器,每当一个线程获取这个Semaphore,就会减一。当线程释放这个Semaphore,就会加一。Semaphore不维护所有者信息。
Event。维护一个布尔标志。分为自动和手动两种。
Critical Section。和Mutex类似,但是是用户态的对象,前面3个都是内核态的。只能用在进程内。线程退出前必须释放(LeaveCriticalSection),否则其他等待这个Critical Section的线程会永远等待下去。
windows其实也有条件变量和读写锁。
- 可等待定时器
- 原子锁
根据用户模式及内核模式下的同步方式的不同,分类及对比如下:
自旋锁
如果自旋锁已经被锁住,这时有程序申请“获取”这个自旋锁时,程序则处于“自旋”状态。所谓自旋状态,就是不停的询问是否可以“获取”自旋锁。
自旋锁不同于其他的等待事件。在线程中如果等待某个事件(Event),操作系统会让这个线程进入睡眠状态,CPU会转而运行其他线程。而自旋锁则不同,它不会切换到别的线程,而是一直让这个线程“自旋”。因此,对自旋锁占用时间不宜过长,否则会导致申请自旋锁的其他线程处于自旋,这会浪费宝贵的CPU时间。
自旋锁的作用一般是使各派遣函数之间同步。尽量不要将自旋锁放在全局变量中,而应该将自旋锁放在设备扩展里面。自旋锁用KSPIN_LOCK数据结构表示。
内核模式下线程同步
内核模式下的线程同步——事件内核对象
Windows Via C/C++:内核模式下的线程同步——概述
以下是拥有signaled/unsignaled状态的内核对象:
进程(Processes)
线程(Threads)
作业(Jobs)
文件和控制台标准输入/输出/错误流(File and console standard input/output/error streams)
事件(Events)
可待待定时器(Waitable timers)
信号量(Semaphores)
互斥量(Mutexes)
内核对象线程同步】概述
用户方式同步的优点是它的 同步速度非常快。但它也有其局限性。
线程同步方式比较
windows 多线程: CreateThread、_beginthread、_beginthreadex、AfxBeginThread 的区别
pthread
线程同步:
线程可分为UI线程,工作者线程
- 临界区(Critical Section)
- 互斥量(Mutex)、互斥对象
- 信号量(Semaphore)
- 事件(Event)
- 读写锁shared_mutex、shared_lock、unique_lock
- 可等待定时器
- 原子锁
- 条件变量
- 线程池threadpool
- 生产者-消费者
- 期货future
进程间的同步
windows和linux共有的进程间通信方式:1. 消息(linux中叫做信号) 2. 共享内存 3. 邮槽 4. 管道 5.socket
windows
- 文件映射
- 共享内存(是文件映射的一种特殊情况)
- 邮件槽(mailslot)(点对点消息队列)CreateMailSlot
- 命名管道 createNamePipe connectNamePipe
- 匿名管道
- 剪贴板
- 动态数据交换DDE
- 对象链接与嵌入OLE
- 远程过程调用RPC
- 动态链接库DLL
- Socket
- WM_COPYDATA 消息
linux
- 无名管道Pipe
- 信号量semaphore
- 共享内存
- 消息队列、报文队列
- 套接字
- 信号Signal
- D-Bus主要概念是总线
windows和linux共有的进程间通信方式:1. 消息(linux中叫做信号) 2. 共享内存 3. 邮槽 4. 管道 5.socket
其他总结:
Unix和windows进程间通信的主要方式
linux系统IPC:
管道( pipe ):管道是一种半双工的通信方式,数据只能单向流动,而且只能在具有亲缘关系的进程间使用。进程的亲缘关系通常是指父子进程关系。
命名管道 (named pipe) : 命名管道也是半双工的通信方式,但是它允许无亲缘关系进程间的通信。
信号量( semophore ): 信号量是一个计数器,可以用来控制多个进程对共享资源的访问。它常作为一种锁机制,防止某进程正在访问共享资源时,其他进程也访问该资源。因此,主要作为进程间以及同一进程内不同线程之间的同步手段。
消息队列( message queue ) : 消息队列是由消息的链表,存放在内核中并由消息队列标识符标识。消息队列克服了信号传递信息少、管道只能承载无格式字节流以及缓冲区大小受限等缺点。
信号 ( sinal ):信号是一种比较复杂的通信方式,用于通知接收进程某个事件已经发生。
共享内存( shared memory ) :共享内存就是映射一段能被其他进程所访问的内存,这段共享内存由一个进程创建,但多个进程都可以访问。共享内存是最快的 IPC 方式,它是针对其他进程间通信方式运行效率低而专门设计的。它往往与其他通信机制,如信号量,配合使用,来实现进程间的同步和通信。
套接字( socket ) : 套解口也是一种进程间通信机制,与其他通信机制不同的是,它可用于不同及其间的进程通信。
windows系统IPC:
剪贴板(Clipboard):当用户在应用程序中执行剪切或复制操作时,应用程序将选定的数据以一个或多个标准或应用程序定义的格式放在剪贴板中。
WM_COPYDATA消息:当一个应用向另一个应用传送数据时,发送方只需使用调用SendMessage函数, 接收方只需像处理其它消息那样处理WM_COPYDATA消息,这样收发双方就实现了数据共享,它在底层实际上是通过文件映射来实现的。
文件映射(File Mapping ):使进程把文件内容当作进程地址区间一块内存那样来对待。只需简单的指针操作就可读取和修改文件的内容。 允许多个进程访问同一文件映射对象,各个进程在它自己的地址空间里接收内存的指针,通过使用这些指针,不同进程就可以读写文件的内容,实现了对文件中数据的共享。
共享内存(Shared Memory)是文件映射的一种特殊情况进程在创建文件映射对象时用0xFFFFFFFF来代替文件句柄(HANDLE),就表示了对应的文件映射对象是从操作系统页面文件访问内存,其它进程打开该文件映射对象就可以访问该内存块。由于共享内存是用 文件映射实现的, 所以它也有较好的安全性,也只能运行于同一计算机上的进程之间。
动态数据交换(DDE):是使用共享内存在应用程序之间进行数据交换的一种进程间通信形式。应用程序可以使用DDE进行一次性数据传输,也可以当出现新数据时, 通过发送更新值在应用程序间动态交换数据。DDE和剪贴板一样既支持标准数据格式(如文本、位图等),又可以支持自己定义的数据格式。但它们的数据传输机制却不同,一个明显区别是剪贴板操作几乎总是用作对用户指定操作的一次性应答,如从菜单中选择Paste命令。尽管DDE也可以由用户启动,但它继续发挥作用一般不必用户进一步干预。可以发生在单机或网络中不同计算机的应用程序之间。
邮件槽(Mailslot):提供进程间单向通信能力,任何进程都能建立邮件槽成为邮件槽服务器。其它进程称为邮件槽客户,可以通过邮件槽的名字给邮件槽服务器进程发送消息。进来的消息一直放在邮件槽中,直到服务器进程读取它为止。一个进程既可以是邮件槽服务器也可以是邮件槽客户,因此可建立多个 邮件槽实现进程间的双向通信。
管道( pipe ):同上linux系统 & 命名管道
套接字(Sockets ):同上linux系统
https://www.cnblogs.com/findumars/p/6329593.html
http://blog.163.com/laylau_ll/blog/static/178625909201173131345423/
协程
协程,又称微线程,纤程。
Boost.Coroutine
原子操作
原子库为细粒度的原子操作提供组件,允许无锁并发编程。
http://zh.cppreference.com/w/cpp/atomic
Win32多线程程序设计中关键函数
- _beginthread
- _beginthreadex
- _endthread
- _endthreadex
- AfxBeginThread
- CloseHandle
- CoInitializeEx
- CreateEvent
- CreateFile
- CreateFileMapping
- CreateIoCompletionPort
- CreateMutex
- CreateSemaphore
- CreateThread
- DeleteCriticalSection
- DisableThreadLibraryCalls
- EnterCriticalSection
- ExitThread
- FileIOCompletionRoutine
- GetExitCodeThread
- GetQueuedCompletionStatus
- GetStdHandle
- GetThreadPriority
- InitializeCriticalSection
- InterlockedDecrement
- InterlockedExchange
- InterlockedIncrement
- LeaveCriticalSection
- MapViewOfFile
- MsgWaitForMultipleObjects
- MsgWaitForMultipleObjectsEx
- MsgWaitForMultipleObjects
- OpenFileMapping
- PostThreadMessage
- ReadFile
- ReadFileEx
- ReleaseMutex
- ReleaseSemaphore
- ReplyMessage
- ResumeThread
- SetThreadPriority
- SleepEx
- SuspendThread
- TerminateThread
- TlsAlloc
- TlsFree
- TlsGetValue
- TlsSetValue
- UnmapViewOfFile
- GetOverlappedResult
- WaitForMultipleObjects
- WaitForMultipleObjectsEx
- WaitForSingleObject
- WaitForSingleObjectEx
- WriteFile
- WriteFileEx
C++11线程
线程支持库
http://zh.cppreference.com/w/cpp/thread
C++ 包含线程、互斥、条件变量和期货的内建支持。
互斥
定义于头文件
- mutex (C++11) 提供基本互斥设施 (类)
- timed_mutex (C++11) 提供互斥设施,实现有时限锁定 (类)
- recursive_mutex (C++11) 提供能被同一线程递归锁定的互斥设施 (类)
- recursive_timed_mutex (C++11) 提供能被同一线程递归锁定的互斥设施,并实现有时限锁定 (类)
定义于头文件
- shared_mutex (C++17) 提供共享互斥设施 (类)
- shared_timed_mutex (C++14) 提供共享互斥设施 (类)
通用互斥管理
定义于头文件
- lock_guard (C++11) 实现严格基于作用域的互斥所有权包装器 (类模板)
- scoped_lock (C++17) 多互斥的免死锁 RAII 封装器 (类模板)
- unique_lock (C++11) 实现可移动的互斥锁有权包装器 (类模板)
- shared_lock (C++14) 实现可移动的共享互斥所有权封装器 (类模板)
- defer_lock_t (C++11) 用于指定锁定策略的标签类型 (类)
- try_to_lock_t (C++11) 用于指定锁定策略的标签类型 (类)
- adopt_lock_t (C++11) 用于指定锁定策略的标签类型 (类)
- defer_lock (C++11) 用于指定锁定策略的标签常量 (常量)
- try_to_lock (C++11) 用于指定锁定策略的标签常量 (常量)
- adopt_lock (C++11) 用于指定锁定策略的标签常量 (常量)
常用代码
互斥锁
std::mutex g_pages_mutex;
std::lock_guard<std::mutex> guard(g_pages_mutex);
std::lock_guard
读写锁
// 多个线程/读者能同时读计数器的值。
unsigned int get() const {
std::shared_lock<std::shared_mutex> lock(mutex_);
return value_;
}
// 只有一个线程/写者能增加/写线程的值。
void increment() {
std::unique_lock<std::shared_mutex> lock(mutex_);
value_++;
}
// 只有一个线程/写者能重置/写线程的值。
void reset() {
std::unique_lock<std::shared_mutex> lock(mutex_);
value_ = 0;
}
std::async
std::async大概的工作过程:先将异步操作用std::packaged_task包装起来,然后将异步操作的结果放到std::promise中,这个过程就是创造未来的过程。外面再通过future.get/wait来获取这个未来的结果。
可以说,std::async帮我们将std::future、std::promise和std::packaged_task三者结合了起来。
死锁
Linux
一个 Linux 上分析死锁的简单方法
产生死锁的四个必要条件
- (1) 互斥条件:一个资源每次只能被一个进程(线程)使用。
- (2) 请求与保持条件:一个进程(线程)因请求资源而阻塞时,对已获得的资源保持不放。
- (3) 不剥夺条件 : 此进程(线程)已获得的资源,在末使用完之前,不能强行剥夺。
- (4) 循环等待条件 : 多个进程(线程)之间形成一种头尾相接的循环等待资源关系。
使用 pstack 和 gdb 工具对死锁程序进行分析
window
LockCop
LockCop通过下载Windows核心编程的源码编译可以得到
多线程程序开发过程中,最难的是对多线程的调试和逻辑分析,特别是与锁相关的缺陷分析,比如:死锁和无限等待。为此,微乳提供了一套API,从系统层面来辅助开发者分析各个线程的运行状态(running or blocking)和线程与锁的等待关系,这就是:Wait Chain Tranversal。
《Windows核心编程》也提供了Lockcop.exe和badlock.exe这一对示例来演示WCT
WCT
要检索一个或多个线程的等待链,请使用OpenThreadWaitChainSession和GetThreadWaitChain函数创建WCT会话。WCT会话由HWCT类型的句柄表示。会话本质上可以是同步的,也可以是异步的。同步会话将阻止调用线程,直到检索到等待链。无法取消同步会话。异步会话不会阻塞调用线程,并且可以使用CloseThreadWaitChainSession函数由应用程序取消。异步操作的结果通过应用程序提供的WaitChainCallback.aspx)回调函数提供。
对于异步会话,调用者可以通过GetThreadWaitChain指定指向上下文数据结构的指针。相同的指针传递给回调函数。该上下文数据结构是用户定义的并且对WCT不透明。应用程序可以使用它来在WCT查询和回调函数之间传递上下文。一种常见的方法是通过此结构传递事件句柄; 执行回调时,它会发出此事件的信号,并通知某个监控线程查询已完成。
Developing with Wait Chain Traversal
参考
https://segmentfault.com/a/1190000006232497
http://www.bogotobogo.com/cplusplus/multithreaded4_cplusplus11.php
书籍
- Win32多线程程序设计
- Windows核心编程
- C++并发编程