亚洲av成人无遮挡网站在线观看,少妇性bbb搡bbb爽爽爽,亚洲av日韩精品久久久久久,兔费看少妇性l交大片免费,无码少妇一区二区三区

  免費注冊 查看新帖 |

Chinaunix

  平臺 論壇 博客 文庫
最近訪問板塊 發(fā)新帖
查看: 1581 | 回復(fù): 0
打印 上一主題 下一主題

用于并行計算的多線程數(shù)據(jù)結(jié)構(gòu),第 1 部分: 設(shè)計并發(fā)數(shù)據(jù)結(jié)構(gòu) [復(fù)制鏈接]

論壇徽章:
0
跳轉(zhuǎn)到指定樓層
1 [收藏(0)] [報告]
發(fā)表于 2012-01-17 17:12 |只看該作者 |倒序瀏覽
簡介
現(xiàn)在,您的計算機有四個 CPU 核;并行計算 是最時髦的主題,您急于掌握這種技術(shù)。但是,并行編程不只是在隨便什么函數(shù)和方法中使用互斥鎖和條件變量。C++ 開發(fā)人員必須掌握的關(guān)鍵技能之一是設(shè)計并發(fā)數(shù)據(jù)結(jié)構(gòu)。本文是兩篇系列文章的第一篇,討論如何在多線程環(huán)境中設(shè)計并發(fā)數(shù)據(jù)結(jié)構(gòu)。對于本文,我們使用 POSIX Threads 庫,但是也可以使用 Boost Threads 等實現(xiàn)。
本文假設(shè)您基本了解基本數(shù)據(jù)結(jié)構(gòu),比較熟悉 POSIX Threads 庫。您還應(yīng)該基本了解線程的創(chuàng)建、互斥鎖和條件變量。在本文的示例中,會相當頻繁地使用 pthread_mutex_lock、pthread_mutex_unlockpthread_cond_wait、pthread_cond_signalpthread_cond_broadcast。

設(shè)計并發(fā)隊列
我們首先擴展最基本的數(shù)據(jù)結(jié)構(gòu)之一:隊列。我們的隊列基于鏈表。底層列表的接口基于 Standard Template Library。多個控制線程可以同時在隊列中添加數(shù)據(jù)或刪除數(shù)據(jù),所以需要用互斥鎖對象管理同步。隊列類的構(gòu)造函數(shù)和析構(gòu)函數(shù)負責創(chuàng)建和銷毀互斥鎖,見 清單 1。

清單 1. 基于鏈表和互斥鎖的并發(fā)隊列
                                #include <pthread.h>#include <list.h> // you could use std::list or your implementation namespace concurrent { template <typename T>class Queue { public:    Queue( ) {        pthread_mutex_init(&_lock, NULL);     }     ~Queue( ) {        pthread_mutex_destroy(&_lock);    }     void push(const T& data);    T pop( ); private:     list<T> _list;     pthread_mutex_t _lock;}};

在并發(fā)隊列中插入和刪除數(shù)據(jù)
顯然,把數(shù)據(jù)放到隊列中就像是把數(shù)據(jù)添加到列表中,必須使用互斥鎖保護這個操作。但是,如果多個線程都試圖把數(shù)據(jù)添加到隊列中,會發(fā)生什么?第一個線程鎖住互斥并把數(shù)據(jù)添加到隊列中,而其他線程等待輪到它們操作。第一個線程解鎖/釋放互斥鎖之后,操作系統(tǒng)決定接下來讓哪個線程在隊列中添加數(shù)據(jù)。通常,在沒有實時優(yōu)先級線程的 Linux&reg; 系統(tǒng)中,接下來喚醒等待時間最長的線程,它獲得鎖并把數(shù)據(jù)添加到隊列中。清單 2 給出代碼的第一個有效版本。

清單 2. 在隊列中添加數(shù)據(jù)
                                void Queue<T>::push(const T& value ) {        pthread_mutex_lock(&_lock);       _list.push_back(value);       pthread_mutex_unlock(&_lock);}

取出數(shù)據(jù)的代碼與此類似,見 清單 3。

清單 3. 從隊列中取出數(shù)據(jù)
                                T Queue<T>::pop( ) {        if (_list.empty( )) {            throw ”element not found”;       }       pthread_mutex_lock(&_lock);        T _temp = _list.front( );       _list.pop_front( );       pthread_mutex_unlock(&_lock);       return _temp;}

清單 2 和 清單 3 中的代碼是有效的。但是,請考慮這樣的情況:您有一個很長的隊列(可能包含超過 100,000 個元素),而且在代碼執(zhí)行期間的某個時候,從隊列中讀取數(shù)據(jù)的線程遠遠多于添加數(shù)據(jù)的線程。因為添加和取出數(shù)據(jù)操作使用相同的互斥鎖,所以讀取數(shù)據(jù)的速度會影響寫數(shù)據(jù)的線程訪問鎖。那么,使用兩個鎖怎么樣?一個鎖用于讀取操作,另一個用于寫操作。清單 4 給出修改后的Queue 類。

清單 4. 對于讀和寫操作使用單獨的互斥鎖的并發(fā)隊列
                                template <typename T>class Queue { public:    Queue( ) {        pthread_mutex_init(&_rlock, NULL);        pthread_mutex_init(&_wlock, NULL);    }     ~Queue( ) {        pthread_mutex_destroy(&_rlock);       pthread_mutex_destroy(&_wlock);    }     void push(const T& data);    T pop( ); private:     list<T> _list;     pthread_mutex_t _rlock, _wlock;}

清單 5 給出 push/pop 方法的定義。

清單 5. 使用單獨互斥鎖的并發(fā)隊列 Push/Pop 操作
                                void Queue<T>::push(const T& value ) {        pthread_mutex_lock(&_wlock);       _list.push_back(value);       pthread_mutex_unlock(&_wlock);}T Queue<T>::pop( ) {        if (_list.empty( )) {            throw ”element not found”;       }       pthread_mutex_lock(&_rlock);       T _temp = _list.front( );       _list.pop_front( );       pthread_mutex_unlock(&_rlock);       return _temp;}


設(shè)計并發(fā)阻塞隊列
目前,如果讀線程試圖從沒有數(shù)據(jù)的隊列讀取數(shù)據(jù),僅僅會拋出異常并繼續(xù)執(zhí)行。但是,這種做法不總是我們想要的,讀線程很可能希望等待(即阻塞自身),直到有數(shù)據(jù)可用時為止。這種隊列稱為阻塞的隊列。如何讓讀線程在發(fā)現(xiàn)隊列是空的之后等待?一種做法是定期輪詢隊列。但是,因為這種做法不保證隊列中有數(shù)據(jù)可用,它可能會導致浪費大量 CPU 周期。推薦的方法是使用條件變量 — 即 pthread_cond_t 類型的變量。在深入討論語義之前,先來看一下修改后的隊列定義,見 清單 6。

清單 6. 使用條件變量的并發(fā)阻塞隊列
                                template <typename T>class BlockingQueue { public:    BlockingQueue ( ) {        pthread_mutex_init(&_lock, NULL);        pthread_cond_init(&_cond, NULL);    }     ~BlockingQueue ( ) {        pthread_mutex_destroy(&_lock);       pthread_cond_destroy(&_cond);    }     void push(const T& data);    T pop( ); private:     list<T> _list;     pthread_mutex_t _lock;    pthread_cond_t _cond;}

清單 7 給出阻塞隊列的 pop 操作定義。

清單 7. 從隊列中取出數(shù)據(jù)
                                T BlockingQueue<T>::pop( ) {        pthread_mutex_lock(&_lock);       if (_list.empty( )) {            pthread_cond_wait(&_cond, &_lock) ;       }       T _temp = _list.front( );       _list.pop_front( );       pthread_mutex_unlock(&_lock);       return _temp;}

當隊列是空的時候,讀線程現(xiàn)在并不拋出異常,而是在條件變量上阻塞自身。pthread_cond_wait 還隱式地釋放 mutex_lock,F(xiàn)在,考慮這個場景:有兩個讀線程和一個空的隊列。第一個讀線程鎖住互斥鎖,發(fā)現(xiàn)隊列是空的,然后在 _cond 上阻塞自身,這會隱式地釋放互斥鎖。第二個讀線程經(jīng)歷同樣的過程。因此,最后兩個讀線程都等待條件變量,互斥鎖沒有被鎖住。
現(xiàn)在,看一下 push() 方法的定義,見 清單 8。

清單 8. 在阻塞隊列中添加數(shù)據(jù)
                                void BlockingQueue <T>::push(const T& value ) {        pthread_mutex_lock(&_lock);       const bool was_empty = _list.empty( );       _list.push_back(value);       pthread_mutex_unlock(&_lock);       if (was_empty)            pthread_cond_broadcast(&_cond);}

如果列表原來是空的,就調(diào)用 pthread_cond_broadcast 以宣告列表中已經(jīng)添加了數(shù)據(jù)。這么做會喚醒所有等待條件變量 _cond 的讀線程;讀線程現(xiàn)在隱式地爭奪互斥鎖。操作系統(tǒng)調(diào)度程序決定哪個線程獲得對互斥鎖的控制權(quán) — 通常,等待時間最長的讀線程先讀取數(shù)據(jù)。
并發(fā)阻塞隊列設(shè)計有兩個要注意的方面:
  • 可以不使用 pthread_cond_broadcast,而是使用 pthread_cond_signal。但是,pthread_cond_signal 會釋放至少一個等待條件變量的線程,這個線程不一定是等待時間最長的讀線程。盡管使用 pthread_cond_signal 不會損害阻塞隊列的功能,但是這可能會導致某些讀線程的等待時間過長。
  • 可能會出現(xiàn)虛假的線程喚醒。因此,在喚醒讀線程之后,要確認列表非空,然后再繼續(xù)處理。清單 9 給出稍加修改的 pop()方法,強烈建議使用基于 while 循環(huán)的 pop() 版本。

清單 9. 能夠應(yīng)付虛假喚醒的 pop() 方法
                                T BlockingQueue<T>::pop( ) {        pthread_cond_wait(&_cond, &_lock) ;       while(_list.empty( )) {            pthread_cond_wait(&_cond) ;       }       T _temp = _list.front( );       _list.pop_front( );       pthread_mutex_unlock(&_lock);       return _temp;}


設(shè)計有超時限制的并發(fā)阻塞隊列
在許多系統(tǒng)中,如果無法在特定的時間段內(nèi)處理新數(shù)據(jù),就根本不處理數(shù)據(jù)了。例如,新聞頻道的自動收報機顯示來自金融交易所的實時股票行情,它每 n 秒收到一次新數(shù)據(jù)。如果在 n 秒內(nèi)無法處理以前的一些數(shù)據(jù),就應(yīng)該丟棄這些數(shù)據(jù)并顯示最新的信息。根據(jù)這個概念,我們來看看如何給并發(fā)隊列的添加和取出操作增加超時限制。這意味著,如果系統(tǒng)無法在指定的時間限制內(nèi)執(zhí)行添加和取出操作,就應(yīng)該根本不執(zhí)行操作。清單 10 給出接口。

清單 10. 添加和取出操作有時間限制的并發(fā)隊列
                                template <typename T>class TimedBlockingQueue { public:    TimedBlockingQueue ( );    ~TimedBlockingQueue ( );    bool push(const T& data, const int seconds);    T pop(const int seconds); private:     list<T> _list;     pthread_mutex_t _lock;    pthread_cond_t _cond;}

首先看看有時間限制的 push() 方法。push() 方法不依賴于任何條件變量,所以沒有額外的等待。造成延遲的惟一原因是寫線程太多,要等待很長時間才能獲得鎖。那么,為什么不提高寫線程的優(yōu)先級?原因是,如果所有寫線程的優(yōu)先級都提高了,這并不能解決問題。相反,應(yīng)該考慮創(chuàng)建少數(shù)幾個調(diào)度優(yōu)先級高的寫線程,把應(yīng)該確保添加到隊列中的數(shù)據(jù)交給這些線程。清單 11 給出代碼。

清單 11. 把數(shù)據(jù)添加到阻塞隊列中,具有超時限制
                                bool TimedBlockingQueue <T>::push(const T& data, const int seconds) {       struct timespec ts1, ts2;       const bool was_empty = _list.empty( );       clock_gettime(CLOCK_REALTIME, &ts1);       pthread_mutex_lock(&_lock);       clock_gettime(CLOCK_REALTIME, &ts2);       if ((ts2.tv_sec – ts1.tv_sec) <seconds) {       was_empty = _list.empty( );       _list.push_back(value);       {       pthread_mutex_unlock(&_lock);       if (was_empty)            pthread_cond_broadcast(&_cond);}

clock_gettime 例程返回一個 timespec 結(jié)構(gòu),它是系統(tǒng)紀元以來經(jīng)過的時間。在獲取互斥鎖之前和之后各調(diào)用這個例程一次,從而根據(jù)經(jīng)過的時間決定是否需要進一步處理。
具有超時限制的取出數(shù)據(jù)操作比添加數(shù)據(jù)復(fù)雜;注意,讀線程會等待條件變量。第一個檢查與 push() 相似。如果在讀線程能夠獲得互斥鎖之前發(fā)生了超時,那么不需要進行處理。接下來,讀線程需要確保(這是第二個檢查)它等待條件變量的時間不超過指定的超時時間。如果到超時時間段結(jié)束時還沒有被喚醒,讀線程需要喚醒自身并釋放互斥鎖。
有了這些背景知識,我們來看看 pthread_cond_timedwait 函數(shù),這個函數(shù)用于進行第二個檢查。這個函數(shù)與 pthread_cond_wait 相似,但是第三個參數(shù)是絕對時間值,到達這個時間時讀線程自愿放棄等待。如果在超時之前讀線程被喚醒,pthread_cond_timedwait的返回值是 0。清單 12 給出代碼。

清單 12. 從阻塞隊列中取出數(shù)據(jù),具有超時限制
                                T TimedBlockingQueue <T>::pop(const int seconds) {        struct timespec ts1, ts2;        clock_gettime(CLOCK_REALTIME, &ts1);        pthread_mutex_lock(&_lock);       clock_gettime(CLOCK_REALTIME, &ts2);       // First Check        if ((ts1.tv_sec – ts2.tv_sec) < seconds) {            ts2.tv_sec += seconds; // specify wake up time           while(_list.empty( ) && (result == 0)) {                result = pthread_cond_timedwait(&_cond, &_lock, &ts2) ;           }           if (result == 0) { // Second Check                T _temp = _list.front( );              _list.pop_front( );              pthread_mutex_unlock(&_lock);              return _temp;          }      }      pthread_mutex_unlock(&lock);      throw “timeout happened”;}

清單 12 中的 while 循環(huán)確保正確地處理虛假的喚醒。最后,在某些 Linux 系統(tǒng)上,clock_gettime 可能是 librt.so 的組成部分,可能需要在編譯器命令行中添加 –lrt 開關(guān)。
使用 pthread_mutex_timedlock API
清單 11 和 清單 12 的缺點之一是,當線程最終獲得鎖時,可能已經(jīng)超時了。因此,它只能釋放鎖。如果系統(tǒng)支持的話,可以使用pthread_mutex_timedlock API 進一步優(yōu)化這個場景。這個例程有兩個參數(shù),第二個參數(shù)是絕對時間值。如果在到達這個時間時還無法獲得鎖,例程會返回且狀態(tài)碼非零。因此,使用這個例程可以減少系統(tǒng)中等待的線程數(shù)量。下面是這個例程的聲明:
int pthread_mutex_timedlock(pthread_mutex_t *mutex,       const struct timespec *abs_timeout);


設(shè)計有大小限制的并發(fā)阻塞隊列
最后,討論有大小限制的并發(fā)阻塞隊列。這種隊列與并發(fā)阻塞隊列相似,但是對隊列的大小有限制。在許多內(nèi)存有限的嵌入式系統(tǒng)中,確實需要有大小限制的隊列。
對于阻塞隊列,只有讀線程需要在隊列中沒有數(shù)據(jù)時等待。對于有大小限制的阻塞隊列,如果隊列滿了,寫線程也需要等待。這種隊列的外部接口與阻塞隊列相似,見 清單 13。(注意,這里使用向量而不是列表。如果愿意,可以使用基本的 C/C++ 數(shù)組并把它初始化為指定的大小。)

清單 13. 有大小限制的并發(fā)阻塞隊列
                                template <typename T>class BoundedBlockingQueue { public:    BoundedBlockingQueue (int size) : maxSize(size) {        pthread_mutex_init(&_lock, NULL);        pthread_cond_init(&_rcond, NULL);       pthread_cond_init(&_wcond, NULL);       _array.reserve(maxSize);    }     ~BoundedBlockingQueue ( ) {        pthread_mutex_destroy(&_lock);       pthread_cond_destroy(&_rcond);       pthread_cond_destroy(&_wcond);    }     void push(const T& data);    T pop( ); private:     vector<T> _array; // or T* _array if you so prefer    int maxSize;    pthread_mutex_t _lock;    pthread_cond_t _rcond, _wcond;}

在解釋添加數(shù)據(jù)操作之前,看一下 清單 14 中的代碼。

清單 14. 在有大小限制的阻塞隊列中添加數(shù)據(jù)
                                void BoundedBlockingQueue <T>::push(const T& value ) {        pthread_mutex_lock(&_lock);       const bool was_empty = _array.empty( );       while (_array.size( ) == maxSize) {            pthread_cond_wait(&_wcond, &_lock);       }        _ array.push_back(value);      pthread_mutex_unlock(&_lock);      if (was_empty)           pthread_cond_broadcast(&_rcond);}

鎖是否可以擴展到其他數(shù)據(jù)結(jié)構(gòu)?[size=0.76em]
當然可以。但這是最好的做法嗎?不是。考慮一個應(yīng)該允許多個線程使用的鏈表。與隊列不同,列表沒有單一的插入或刪除點,使用單一互斥鎖控制對列表的訪問會導致系統(tǒng)功能正常但相當慢。另一種實現(xiàn)是對每個節(jié)點使用鎖,但是這肯定會增加系統(tǒng)的內(nèi)存占用量。本系列的第二部分會討論這些問題。


對于 清單 13 和 清單 14,要注意的第一點是,這個阻塞隊列有兩個條件變量而不是一個。如果隊列滿了,寫線程等待 _wcond 條件變量;讀線程在從隊列中取出數(shù)據(jù)之后需要通知所有線程。同樣,如果隊列是空的,讀線程等待 _rcond 變量,寫線程在把數(shù)據(jù)插入隊列中之后向所有線程發(fā)送廣播消息。如果在發(fā)送廣播通知時沒有線程在等待 _wcond_rcond,會發(fā)生什么?什么也不會發(fā)生;系統(tǒng)會忽略這些消息。還要注意,兩個條件變量使用相同的互斥鎖。清單 15 給出有大小限制的阻塞隊列的 pop() 方法。

清單 15. 從有大小限制的阻塞隊列中取出數(shù)據(jù)
                                T BoundedBlockingQueue<T>::pop( ) {        pthread_mutex_lock(&_lock);       const bool was_full = (_array.size( ) == maxSize);       while(_array.empty( )) {            pthread_cond_wait(&_rcond, &_lock) ;       }       T _temp = _array.front( );       _array.erase( _array.begin( ));       pthread_mutex_unlock(&_lock);       if (was_full)           pthread_cond_broadcast(&_wcond);       return _temp;}

注意,在釋放互斥鎖之后調(diào)用 pthread_cond_broadcast。這是一種好做法,因為這會減少喚醒之后讀線程的等待時間。

結(jié)束語
本文討論了幾種并發(fā)隊列及其實現(xiàn)。實際上,還可能實現(xiàn)其他變體。例如這樣一個隊列,它只允許讀線程在數(shù)據(jù)插入隊列經(jīng)過指定的延時之后才能讀取數(shù)據(jù)。

關(guān)于作者
Arpan Sen 是致力于電子設(shè)計自動化行業(yè)的軟件開發(fā)首席工程師。他使用各種 UNIX 版本(包括 Solaris、SunOS、HP-UX 和 IRIX)以及 Linux 和 Microsoft Windows 已經(jīng)多年。他熱衷于各種軟件性能優(yōu)化技術(shù)、圖論和并行計算。Arpan 獲得了軟件系統(tǒng)碩士學位。




http://www.ibm.com/developerworks/cn/aix/library/au-multithreaded_structures1/index.html

您需要登錄后才可以回帖 登錄 | 注冊

本版積分規(guī)則 發(fā)表回復(fù)

  

北京盛拓優(yōu)訊信息技術(shù)有限公司. 版權(quán)所有 京ICP備16024965號-6 北京市公安局海淀分局網(wǎng)監(jiān)中心備案編號:11010802020122 niuxiaotong@pcpop.com 17352615567
未成年舉報專區(qū)
中國互聯(lián)網(wǎng)協(xié)會會員  聯(lián)系我們:huangweiwei@itpub.net
感謝所有關(guān)心和支持過ChinaUnix的朋友們 轉(zhuǎn)載本站內(nèi)容請注明原作者名及出處

清除 Cookies - ChinaUnix - Archiver - WAP - TOP