簡介
現(xiàn)在,您的計算機有四個 CPU 核;并行計算 是最時髦的主題,您急于掌握這種技術(shù)。但是,并行編程不只是在隨便什么函數(shù)和方法中使用互斥鎖和條件變量。C++ 開發(fā)人員必須掌握的關(guān)鍵技能之一是設(shè)計并發(fā)數(shù)據(jù)結(jié)構(gòu)。本文是兩篇系列文章的第一篇,討論如何在多線程環(huán)境中設(shè)計并發(fā)數(shù)據(jù)結(jié)構(gòu)。對于本文,我們使用 POSIX Threads 庫,但是也可以使用 Boost Threads 等實現(xiàn)。
本文假設(shè)您基本了解基本數(shù)據(jù)結(jié)構(gòu),比較熟悉 POSIX Threads 庫。您還應(yīng)該基本了解線程的創(chuàng)建、互斥鎖和條件變量。在本文的示例中,會相當頻繁地使用 pthread_mutex_lock、pthread_mutex_unlock、pthread_cond_wait、pthread_cond_signal 和pthread_cond_broadcast。
設(shè)計并發(fā)隊列
我們首先擴展最基本的數(shù)據(jù)結(jié)構(gòu)之一:隊列。我們的隊列基于鏈表。底層列表的接口基于 Standard Template Library。多個控制線程可以同時在隊列中添加數(shù)據(jù)或刪除數(shù)據(jù),所以需要用互斥鎖對象管理同步。隊列類的構(gòu)造函數(shù)和析構(gòu)函數(shù)負責創(chuàng)建和銷毀互斥鎖,見 清單 1。
清單 1. 基于鏈表和互斥鎖的并發(fā)隊列
#include <pthread.h>#include <list.h> // you could use std::list or your implementation namespace concurrent { template <typename T>class Queue { public: Queue( ) { pthread_mutex_init(&_lock, NULL); } ~Queue( ) { pthread_mutex_destroy(&_lock); } void push(const T& data); T pop( ); private: list<T> _list; pthread_mutex_t _lock;}}; |
在并發(fā)隊列中插入和刪除數(shù)據(jù)
顯然,把數(shù)據(jù)放到隊列中就像是把數(shù)據(jù)添加到列表中,必須使用互斥鎖保護這個操作。但是,如果多個線程都試圖把數(shù)據(jù)添加到隊列中,會發(fā)生什么?第一個線程鎖住互斥并把數(shù)據(jù)添加到隊列中,而其他線程等待輪到它們操作。第一個線程解鎖/釋放互斥鎖之后,操作系統(tǒng)決定接下來讓哪個線程在隊列中添加數(shù)據(jù)。通常,在沒有實時優(yōu)先級線程的 Linux® 系統(tǒng)中,接下來喚醒等待時間最長的線程,它獲得鎖并把數(shù)據(jù)添加到隊列中。清單 2 給出代碼的第一個有效版本。
清單 2. 在隊列中添加數(shù)據(jù)
void Queue<T>::push(const T& value ) { pthread_mutex_lock(&_lock); _list.push_back(value); pthread_mutex_unlock(&_lock);} |
取出數(shù)據(jù)的代碼與此類似,見 清單 3。
清單 3. 從隊列中取出數(shù)據(jù)
T Queue<T>::pop( ) { if (_list.empty( )) { throw ”element not found”; } pthread_mutex_lock(&_lock); T _temp = _list.front( ); _list.pop_front( ); pthread_mutex_unlock(&_lock); return _temp;} |
清單 2 和 清單 3 中的代碼是有效的。但是,請考慮這樣的情況:您有一個很長的隊列(可能包含超過 100,000 個元素),而且在代碼執(zhí)行期間的某個時候,從隊列中讀取數(shù)據(jù)的線程遠遠多于添加數(shù)據(jù)的線程。因為添加和取出數(shù)據(jù)操作使用相同的互斥鎖,所以讀取數(shù)據(jù)的速度會影響寫數(shù)據(jù)的線程訪問鎖。那么,使用兩個鎖怎么樣?一個鎖用于讀取操作,另一個用于寫操作。清單 4 給出修改后的Queue 類。
清單 4. 對于讀和寫操作使用單獨的互斥鎖的并發(fā)隊列
template <typename T>class Queue { public: Queue( ) { pthread_mutex_init(&_rlock, NULL); pthread_mutex_init(&_wlock, NULL); } ~Queue( ) { pthread_mutex_destroy(&_rlock); pthread_mutex_destroy(&_wlock); } void push(const T& data); T pop( ); private: list<T> _list; pthread_mutex_t _rlock, _wlock;} |
清單 5 給出 push/pop 方法的定義。
清單 5. 使用單獨互斥鎖的并發(fā)隊列 Push/Pop 操作
void Queue<T>::push(const T& value ) { pthread_mutex_lock(&_wlock); _list.push_back(value); pthread_mutex_unlock(&_wlock);}T Queue<T>::pop( ) { if (_list.empty( )) { throw ”element not found”; } pthread_mutex_lock(&_rlock); T _temp = _list.front( ); _list.pop_front( ); pthread_mutex_unlock(&_rlock); return _temp;} |
設(shè)計并發(fā)阻塞隊列
目前,如果讀線程試圖從沒有數(shù)據(jù)的隊列讀取數(shù)據(jù),僅僅會拋出異常并繼續(xù)執(zhí)行。但是,這種做法不總是我們想要的,讀線程很可能希望等待(即阻塞自身),直到有數(shù)據(jù)可用時為止。這種隊列稱為阻塞的隊列。如何讓讀線程在發(fā)現(xiàn)隊列是空的之后等待?一種做法是定期輪詢隊列。但是,因為這種做法不保證隊列中有數(shù)據(jù)可用,它可能會導致浪費大量 CPU 周期。推薦的方法是使用條件變量 — 即 pthread_cond_t 類型的變量。在深入討論語義之前,先來看一下修改后的隊列定義,見 清單 6。
清單 6. 使用條件變量的并發(fā)阻塞隊列
template <typename T>class BlockingQueue { public: BlockingQueue ( ) { pthread_mutex_init(&_lock, NULL); pthread_cond_init(&_cond, NULL); } ~BlockingQueue ( ) { pthread_mutex_destroy(&_lock); pthread_cond_destroy(&_cond); } void push(const T& data); T pop( ); private: list<T> _list; pthread_mutex_t _lock; pthread_cond_t _cond;} |
清單 7 給出阻塞隊列的 pop 操作定義。
清單 7. 從隊列中取出數(shù)據(jù)
T BlockingQueue<T>::pop( ) { pthread_mutex_lock(&_lock); if (_list.empty( )) { pthread_cond_wait(&_cond, &_lock) ; } T _temp = _list.front( ); _list.pop_front( ); pthread_mutex_unlock(&_lock); return _temp;} |
當隊列是空的時候,讀線程現(xiàn)在并不拋出異常,而是在條件變量上阻塞自身。pthread_cond_wait 還隱式地釋放 mutex_lock,F(xiàn)在,考慮這個場景:有兩個讀線程和一個空的隊列。第一個讀線程鎖住互斥鎖,發(fā)現(xiàn)隊列是空的,然后在 _cond 上阻塞自身,這會隱式地釋放互斥鎖。第二個讀線程經(jīng)歷同樣的過程。因此,最后兩個讀線程都等待條件變量,互斥鎖沒有被鎖住。
現(xiàn)在,看一下 push() 方法的定義,見 清單 8。
清單 8. 在阻塞隊列中添加數(shù)據(jù)
void BlockingQueue <T>::push(const T& value ) { pthread_mutex_lock(&_lock); const bool was_empty = _list.empty( ); _list.push_back(value); pthread_mutex_unlock(&_lock); if (was_empty) pthread_cond_broadcast(&_cond);} |
如果列表原來是空的,就調(diào)用 pthread_cond_broadcast 以宣告列表中已經(jīng)添加了數(shù)據(jù)。這么做會喚醒所有等待條件變量 _cond 的讀線程;讀線程現(xiàn)在隱式地爭奪互斥鎖。操作系統(tǒng)調(diào)度程序決定哪個線程獲得對互斥鎖的控制權(quán) — 通常,等待時間最長的讀線程先讀取數(shù)據(jù)。
并發(fā)阻塞隊列設(shè)計有兩個要注意的方面:
- 可以不使用 pthread_cond_broadcast,而是使用 pthread_cond_signal。但是,pthread_cond_signal 會釋放至少一個等待條件變量的線程,這個線程不一定是等待時間最長的讀線程。盡管使用 pthread_cond_signal 不會損害阻塞隊列的功能,但是這可能會導致某些讀線程的等待時間過長。
- 可能會出現(xiàn)虛假的線程喚醒。因此,在喚醒讀線程之后,要確認列表非空,然后再繼續(xù)處理。清單 9 給出稍加修改的 pop()方法,強烈建議使用基于 while 循環(huán)的 pop() 版本。
清單 9. 能夠應(yīng)付虛假喚醒的 pop() 方法
T BlockingQueue<T>::pop( ) { pthread_cond_wait(&_cond, &_lock) ; while(_list.empty( )) { pthread_cond_wait(&_cond) ; } T _temp = _list.front( ); _list.pop_front( ); pthread_mutex_unlock(&_lock); return _temp;} |
設(shè)計有超時限制的并發(fā)阻塞隊列
在許多系統(tǒng)中,如果無法在特定的時間段內(nèi)處理新數(shù)據(jù),就根本不處理數(shù)據(jù)了。例如,新聞頻道的自動收報機顯示來自金融交易所的實時股票行情,它每 n 秒收到一次新數(shù)據(jù)。如果在 n 秒內(nèi)無法處理以前的一些數(shù)據(jù),就應(yīng)該丟棄這些數(shù)據(jù)并顯示最新的信息。根據(jù)這個概念,我們來看看如何給并發(fā)隊列的添加和取出操作增加超時限制。這意味著,如果系統(tǒng)無法在指定的時間限制內(nèi)執(zhí)行添加和取出操作,就應(yīng)該根本不執(zhí)行操作。清單 10 給出接口。
清單 10. 添加和取出操作有時間限制的并發(fā)隊列
template <typename T>class TimedBlockingQueue { public: TimedBlockingQueue ( ); ~TimedBlockingQueue ( ); bool push(const T& data, const int seconds); T pop(const int seconds); private: list<T> _list; pthread_mutex_t _lock; pthread_cond_t _cond;} |
首先看看有時間限制的 push() 方法。push() 方法不依賴于任何條件變量,所以沒有額外的等待。造成延遲的惟一原因是寫線程太多,要等待很長時間才能獲得鎖。那么,為什么不提高寫線程的優(yōu)先級?原因是,如果所有寫線程的優(yōu)先級都提高了,這并不能解決問題。相反,應(yīng)該考慮創(chuàng)建少數(shù)幾個調(diào)度優(yōu)先級高的寫線程,把應(yīng)該確保添加到隊列中的數(shù)據(jù)交給這些線程。清單 11 給出代碼。
清單 11. 把數(shù)據(jù)添加到阻塞隊列中,具有超時限制
bool TimedBlockingQueue <T>::push(const T& data, const int seconds) { struct timespec ts1, ts2; const bool was_empty = _list.empty( ); clock_gettime(CLOCK_REALTIME, &ts1); pthread_mutex_lock(&_lock); clock_gettime(CLOCK_REALTIME, &ts2); if ((ts2.tv_sec – ts1.tv_sec) <seconds) { was_empty = _list.empty( ); _list.push_back(value); { pthread_mutex_unlock(&_lock); if (was_empty) pthread_cond_broadcast(&_cond);} |
clock_gettime 例程返回一個 timespec 結(jié)構(gòu),它是系統(tǒng)紀元以來經(jīng)過的時間。在獲取互斥鎖之前和之后各調(diào)用這個例程一次,從而根據(jù)經(jīng)過的時間決定是否需要進一步處理。
具有超時限制的取出數(shù)據(jù)操作比添加數(shù)據(jù)復(fù)雜;注意,讀線程會等待條件變量。第一個檢查與 push() 相似。如果在讀線程能夠獲得互斥鎖之前發(fā)生了超時,那么不需要進行處理。接下來,讀線程需要確保(這是第二個檢查)它等待條件變量的時間不超過指定的超時時間。如果到超時時間段結(jié)束時還沒有被喚醒,讀線程需要喚醒自身并釋放互斥鎖。
有了這些背景知識,我們來看看 pthread_cond_timedwait 函數(shù),這個函數(shù)用于進行第二個檢查。這個函數(shù)與 pthread_cond_wait 相似,但是第三個參數(shù)是絕對時間值,到達這個時間時讀線程自愿放棄等待。如果在超時之前讀線程被喚醒,pthread_cond_timedwait的返回值是 0。清單 12 給出代碼。
清單 12. 從阻塞隊列中取出數(shù)據(jù),具有超時限制
T TimedBlockingQueue <T>::pop(const int seconds) { struct timespec ts1, ts2; clock_gettime(CLOCK_REALTIME, &ts1); pthread_mutex_lock(&_lock); clock_gettime(CLOCK_REALTIME, &ts2); // First Check if ((ts1.tv_sec – ts2.tv_sec) < seconds) { ts2.tv_sec += seconds; // specify wake up time while(_list.empty( ) && (result == 0)) { result = pthread_cond_timedwait(&_cond, &_lock, &ts2) ; } if (result == 0) { // Second Check T _temp = _list.front( ); _list.pop_front( ); pthread_mutex_unlock(&_lock); return _temp; } } pthread_mutex_unlock(&lock); throw “timeout happened”;} |
清單 12 中的 while 循環(huán)確保正確地處理虛假的喚醒。最后,在某些 Linux 系統(tǒng)上,clock_gettime 可能是 librt.so 的組成部分,可能需要在編譯器命令行中添加 –lrt 開關(guān)。
使用 pthread_mutex_timedlock API
清單 11 和 清單 12 的缺點之一是,當線程最終獲得鎖時,可能已經(jīng)超時了。因此,它只能釋放鎖。如果系統(tǒng)支持的話,可以使用pthread_mutex_timedlock API 進一步優(yōu)化這個場景。這個例程有兩個參數(shù),第二個參數(shù)是絕對時間值。如果在到達這個時間時還無法獲得鎖,例程會返回且狀態(tài)碼非零。因此,使用這個例程可以減少系統(tǒng)中等待的線程數(shù)量。下面是這個例程的聲明:
int pthread_mutex_timedlock(pthread_mutex_t *mutex, const struct timespec *abs_timeout); |
設(shè)計有大小限制的并發(fā)阻塞隊列
最后,討論有大小限制的并發(fā)阻塞隊列。這種隊列與并發(fā)阻塞隊列相似,但是對隊列的大小有限制。在許多內(nèi)存有限的嵌入式系統(tǒng)中,確實需要有大小限制的隊列。
對于阻塞隊列,只有讀線程需要在隊列中沒有數(shù)據(jù)時等待。對于有大小限制的阻塞隊列,如果隊列滿了,寫線程也需要等待。這種隊列的外部接口與阻塞隊列相似,見 清單 13。(注意,這里使用向量而不是列表。如果愿意,可以使用基本的 C/C++ 數(shù)組并把它初始化為指定的大小。)
清單 13. 有大小限制的并發(fā)阻塞隊列
template <typename T>class BoundedBlockingQueue { public: BoundedBlockingQueue (int size) : maxSize(size) { pthread_mutex_init(&_lock, NULL); pthread_cond_init(&_rcond, NULL); pthread_cond_init(&_wcond, NULL); _array.reserve(maxSize); } ~BoundedBlockingQueue ( ) { pthread_mutex_destroy(&_lock); pthread_cond_destroy(&_rcond); pthread_cond_destroy(&_wcond); } void push(const T& data); T pop( ); private: vector<T> _array; // or T* _array if you so prefer int maxSize; pthread_mutex_t _lock; pthread_cond_t _rcond, _wcond;} |
在解釋添加數(shù)據(jù)操作之前,看一下 清單 14 中的代碼。
清單 14. 在有大小限制的阻塞隊列中添加數(shù)據(jù)
void BoundedBlockingQueue <T>::push(const T& value ) { pthread_mutex_lock(&_lock); const bool was_empty = _array.empty( ); while (_array.size( ) == maxSize) { pthread_cond_wait(&_wcond, &_lock); } _ array.push_back(value); pthread_mutex_unlock(&_lock); if (was_empty) pthread_cond_broadcast(&_rcond);} |
鎖是否可以擴展到其他數(shù)據(jù)結(jié)構(gòu)?[size=0.76em]當然可以。但這是最好的做法嗎?不是。考慮一個應(yīng)該允許多個線程使用的鏈表。與隊列不同,列表沒有單一的插入或刪除點,使用單一互斥鎖控制對列表的訪問會導致系統(tǒng)功能正常但相當慢。另一種實現(xiàn)是對每個節(jié)點使用鎖,但是這肯定會增加系統(tǒng)的內(nèi)存占用量。本系列的第二部分會討論這些問題。
本文討論了幾種并發(fā)隊列及其實現(xiàn)。實際上,還可能實現(xiàn)其他變體。例如這樣一個隊列,它只允許讀線程在數(shù)據(jù)插入隊列經(jīng)過指定的延時之后才能讀取數(shù)據(jù)。
Arpan Sen 是致力于電子設(shè)計自動化行業(yè)的軟件開發(fā)首席工程師。他使用各種 UNIX 版本(包括 Solaris、SunOS、HP-UX 和 IRIX)以及 Linux 和 Microsoft Windows 已經(jīng)多年。他熱衷于各種軟件性能優(yōu)化技術(shù)、圖論和并行計算。Arpan 獲得了軟件系統(tǒng)碩士學位。