- 論壇徽章:
- 0
|
使用 Python 實(shí)現(xiàn)多進(jìn)程
學(xué)習(xí)使用 Python 2.6 管理一組進(jìn)程
2009-11-12
磁針石:xurongzhong#gmail.com
博客:oychw.cublog.cn
原文地址:http://www.ibm.com/developerworks/cn/aix/library/au-multiprocessing/index.html#resources
通過使用 Python 2.6 內(nèi)置的多進(jìn)程模塊,將您的 Unix® Python 應(yīng)用程序擴(kuò)展為使用多核。多進(jìn)程模擬了 Python 線程 API 的部分功能,讓開發(fā)人員能夠?qū)Χ嘟M進(jìn)程進(jìn)行高級(jí)控制,同時(shí)也合并了許多特定于進(jìn)程的額外特性。
簡(jiǎn)介
在 IBM® developerWorks® 的 早期文章 中,我演示了使用 Python 實(shí)現(xiàn)線程式編程的一種簡(jiǎn)單且有效的模式。但是,這種方法的一個(gè)缺陷就是它并不總是能夠提高應(yīng)用程序的速度,因?yàn)槿纸忉屍麈i(Global Interpreter Lock,GIL)將線程有效地限制到一個(gè)核中。如果需要使用計(jì)算機(jī)中的所有核,那么通常都需通過 對(duì) 經(jīng)常使用 fork 操作來實(shí)現(xiàn),從而提高速度。處理進(jìn)程組是件困難的事情,因?yàn)闉榱嗽谶M(jìn)程之間進(jìn)行通信,需要對(duì)所有調(diào)用進(jìn)行協(xié)調(diào),這通常會(huì)使事情變得更復(fù)雜。
幸運(yùn)的是,自 2.6 版本起,Python 包括了一個(gè)名為 “多進(jìn)程(multiprocessing)” 的模塊來幫助處理進(jìn)程。該進(jìn)程模塊的 API 與線程 API 的工作方式有些相似點(diǎn),但是也存在一些需要特別注意的不同之處。主要區(qū)別之一就是進(jìn)程擁有的一些微妙的底層行為,這是高級(jí) API 永遠(yuǎn)無法完全抽象出來的?梢詮亩噙M(jìn)程模塊的官方文檔中了解有關(guān)這方面內(nèi)容(參見 參考資料 小節(jié))。
fork 簡(jiǎn)介
進(jìn)程和線程在并發(fā)性的工作原理方面存在一些明顯的差異。通過閱讀我撰寫的有關(guān)線程的 developerWorks 文章,可以進(jìn)一步了解這些差異(參見 參考資料)。在進(jìn)程執(zhí)行 fork 時(shí),操作系統(tǒng)將創(chuàng)建具有新進(jìn)程 ID 的新的子進(jìn)程,復(fù)制父進(jìn)程的狀態(tài)(內(nèi)存、環(huán)境變量等)。首先,在我們實(shí)際使用進(jìn)程模塊之前,先看一下 Python 中的一個(gè)非;镜 fork 操作。
fork.py:
#!/usr/bin/env python
"""A basic fork in action"""
import os
def my_fork():
child_pid = os.fork()
if child_pid == 0:
print "Child Process: PID# %s" % os.getpid()
else:
print "Parent Process: PID# %s" % os.getpid()
if __name__ == "__main__":
my_fork()
現(xiàn)在來看一下以上代碼的輸出:
mac% python fork.py
Parent Process: PID# 5285
Child Process: PID# 5286
--下例和上面的例子基本類似,可以不實(shí)踐。
在下一個(gè)示例中,增強(qiáng)初始 fork 的代碼,并設(shè)置一個(gè)環(huán)境變量。該環(huán)境變量隨后將被復(fù)制到子進(jìn)程中。下面給出了相應(yīng)的代碼:
示例 1. Python 中的 fork 操作
#!/usr/bin/env python
"""A fork that demonstrates a copied environment"""
import os
from os import environ
def my_fork():
environ['FOO']="baz"
print "FOO environmental variable set to: %s" % environ['FOO']
environ['FOO']="bar"
print "FOO environmental variable changed to: %s" % environ['FOO']
child_pid = os.fork()
if child_pid == 0:
print "Child Process: PID# %s" % os.getpid()
print "Child FOO environmental variable == %s" % environ['FOO']
else:
print "Parent Process: PID# %s" % os.getpid()
print "Parent FOO environmental variable == %s" % environ['FOO']
if __name__ == "__main__":
my_fork()
下面給出了 fork 的輸出:
mac% python env_fork.py
FOO environmental variable set to: baz
FOO environmental variable changed to: bar
Parent Process: PID# 5333
Parent FOO environmental variable == bar
Child Process: PID# 5334
Child FOO environmental variable == bar
在輸出中,可以看到 “修改后的” 環(huán)境變量 FOO 留在了子進(jìn)程和父進(jìn)程中。您可以通過在父進(jìn)程中再次修改環(huán)境變量來進(jìn)一步測(cè)試這個(gè)示例,您將看到子進(jìn)程現(xiàn)在是完全獨(dú)立的,它有了自己的生命。注意,子進(jìn)程模塊也可用于 fork 進(jìn)程,但是實(shí)現(xiàn)方式?jīng)]有多進(jìn)程模塊那么復(fù)雜。
多進(jìn)程簡(jiǎn)介
現(xiàn)在您已經(jīng)了解 Python fork 操作的基本知識(shí),讓我們通過一個(gè)簡(jiǎn)單例子了解它在更高級(jí)的多進(jìn)程庫中的使用。在這個(gè)示例中,仍然會(huì)出現(xiàn) fork,但是已經(jīng)為我們處理了大部分標(biāo)準(zhǔn)工作。
示例 2. 簡(jiǎn)單的多進(jìn)程
--本例演示簡(jiǎn)單地適用multiprocessing創(chuàng)建進(jìn)程的方法,更多的請(qǐng)參考《Python 線程和并發(fā)》
#!/usr/bin/env python
from multiprocessing import Process
import os
import time
def sleeper(name, seconds):
print 'starting child process with id: ', os.getpid()
print 'parent process:', os.getppid()
print 'sleeping for %s ' % seconds
time.sleep(seconds)
print "Done sleeping"
if __name__ == '__main__':
print "in parent process (id %s)" % os.getpid()
p = Process(target=sleeper, args=('bob', 5))
p.start()
print "in parent process after child process start"
print "parent process about to join child process"
p.join()
print "in parent process after child process join"
print "parent process exiting with id ", os.getpid()
print "The parent's parent process:", os.getppid()
如果查看輸出,將會(huì)看到下面的內(nèi)容:
mac% python simple.py
in parent process (id 5245)
in parent process after child process start
parent process about to join child process
starting child process with id: 5246
parent process: 5245
sleeping for 5
Done sleeping
in parent process after child process join
parent process exiting with id 5245
The parent's parent process: 5231
可以看到從主進(jìn)程分出了一個(gè)子進(jìn)程,該子進(jìn)程隨后休眠了 5 秒種。子進(jìn)程分配是在調(diào)用 p.start() 時(shí)發(fā)生的。在下一節(jié)中,您將看到這個(gè)基礎(chǔ)的程序?qū)U(kuò)展為更大的程序。
--下例和上面的例子基本類似,只是涉及了Net-SNMP 而已,可以不實(shí)踐。
構(gòu)建異步 Net-SNMP 引擎
--以下步驟對(duì)于ubuntu很簡(jiǎn)單,只需要在新立德工具添加相關(guān)的軟件即可。
到目前為止,您尚未構(gòu)建任何特別有用的內(nèi)容。下一個(gè)示例將解決一個(gè)實(shí)際問題,為 Net-SNMP 異步生成 Python 綁定。默認(rèn)情況下,Net-SNMP 將阻塞每一個(gè) Python 調(diào)用。使用多進(jìn)程庫可以非常簡(jiǎn)單地將 Net-SNMP 庫轉(zhuǎn)換為完全異步的操作。
在開始之前,需要檢查是否安裝了一些必備的內(nèi)容,以便使用 Python 2.6 多進(jìn)程庫和 Net-SNMP 綁定:
1. 下載 Python 2.6 并針對(duì)所使用的操作系統(tǒng)進(jìn)行編譯:Python 2.6 下載
2. 調(diào)整 shell 路徑,這樣在輸入 python 時(shí)就會(huì)啟動(dòng) Python 2.6。例如,如果將 Python 編譯到 /usr/local/bin/,您就需要預(yù)先處理 $PATH 變量,從而確保它位于一個(gè)較舊的 Python 版本之前。
3. 下載并安裝設(shè)置工具:設(shè)置工具
4. 下載 Net-SNMP,除了使用其他操作系統(tǒng)所需的標(biāo)記(參見相應(yīng)的 README 文件)外,另外使用一個(gè) “--with-python-modules” 標(biāo)記進(jìn)行配置。 ./configure --with-python-modules
按如下所示編譯 Net-SNMP:
---------------------------------------------------------
Net-SNMP configuration summary:
---------------------------------------------------------
SNMP Versions Supported: 1 2c 3
Net-SNMP Version: 5.4.2.1
Building for: darwin9
Network transport support: Callback Unix TCP UDP
SNMPv3 Security Modules: usm
Agent MIB code: default_modules => snmpv3mibs mibII ucd_snmp notification
notification-log-mib target agent_mibs agentx disman/event disman/schedule utilities
Embedded Perl support: enabled
SNMP Perl modules: building -- embeddable
SNMP Python modules: building for /usr/local/bin//python
Authentication support: MD5 SHA1
Encryption support: DES AES
]]
查看以下模塊的代碼,您將隨后運(yùn)行它。
示例 3. Net-SNMP 的多進(jìn)程包裝器
#!/usr/bin/env python2.6
"""
This is a multiprocessing wrapper for Net-SNMP.
This makes a synchronous API asynchronous by combining
it with Python2.6
"""
import netsnmp
from multiprocessing import Process, Queue
class HostRecord():
"""This creates a host record"""
def __init__(self,
hostname = None,
query = None):
self.hostname = hostname
self.query = query
class SnmpSession():
"""A SNMP Session"""
def __init__(self,
oid = "sysDescr",
Version = 2,
DestHost = "localhost",
Community = "public",
Verbose = True,
):
self.oid = oid
self.Version = Version
self.DestHost = DestHost
self.Community = Community
self.Verbose = Verbose
self.var = netsnmp.Varbind(oid, 0)
self.hostrec = HostRecord()
self.hostrec.hostname = self.DestHost
def query(self):
"""Creates SNMP query
Fills out a Host Object and returns result
"""
try:
result = netsnmp.snmpget(self.var,
Version = self.Version,
DestHost = self.DestHost,
Community = self.Community)
self.hostrec.query = result
except Exception, err:
if self.Verbose:
print err
self.hostrec.query = None
finally:
return self.hostrec
def make_query(host):
"""This does the actual snmp query
This is a bit fancy as it accepts both instances
of SnmpSession and host/ip addresses. This
allows a user to customize mass queries with
subsets of different hostnames and community strings
"""
if isinstance(host,SnmpSession):
return host.query()
else:
s = SnmpSession(DestHost=host)
return s.query()
# Function run by worker processes
def worker(input, output):
for func in iter(input.get, 'STOP'):
result = make_query(func)
output.put(result)
def main():
"""Runs everything"""
#clients
hosts = ["localhost", "localhost"]
NUMBER_OF_PROCESSES = len(hosts)
# Create queues
task_queue = Queue()
done_queue = Queue()
#submit tasks
for host in hosts:
task_queue.put(host)
#Start worker processes
for i in range(NUMBER_OF_PROCESSES):
Process(target=worker, args=(task_queue, done_queue)).start()
# Get and print results
print 'Unordered results:'
for i in range(len(hosts)):
print '\t', done_queue.get().query
# Tell child processes to stop
for i in range(NUMBER_OF_PROCESSES):
task_queue.put('STOP')
print "Stopping Process #%s" % i
if __name__ == "__main__":
main()
這里有兩個(gè)類,一個(gè) HostRecord 類和一個(gè) SnmpSession 類。SnmpSession 類包含一個(gè)使用 Net-SNMP 的 SNMP 庫實(shí)際執(zhí)行查詢的方法。由于調(diào)用一般都會(huì)進(jìn)行阻塞,因此需要導(dǎo)入多進(jìn)程庫并使用 Process 運(yùn)行它。此外,傳入一個(gè) task_queue 和一個(gè) done_queue,這樣可以同步并保護(hù)進(jìn)出進(jìn)程池的數(shù)據(jù)。如果對(duì)線程比較熟悉的話,將會(huì)注意到這種方式非常類似于線程 API 使用的方法。
需要特別關(guān)注一下主函數(shù)中 #clients 部分的主機(jī)列表。注意,可以對(duì) 50 或 100 臺(tái)或更多主機(jī)運(yùn)行異步 SNMP 查詢,具體取決于當(dāng)前使用的硬件。NUMBER_OF_PROCESSES 變量的設(shè)置非常簡(jiǎn)單,只是被設(shè)置為主機(jī)列表中的主機(jī)數(shù)。最終,最后兩個(gè)部分在處理過程中從隊(duì)列獲取結(jié)果,然后將一個(gè) “STOP” 消息放到隊(duì)列中,表示可以終止進(jìn)程。
如果在對(duì) Net-SNMP 進(jìn)行監(jiān)聽的 OS X 機(jī)器上運(yùn)行代碼,那么將會(huì)得到如下所示的非阻塞輸出:
mac% time python multisnmp.py
Unordered results:
('Darwin mac.local 9.6.0 Darwin Kernel Version 9.6.0: Mon Nov 24 17:37:00 PST 2008;
root:xnu-1228.9.59~1/RELEASE_I386 i386',)
('Darwin mac.local 9.6.0 Darwin Kernel Version 9.6.0: Mon Nov 24 17:37:00 PST 2008;
root:xnu-1228.9.59~1/RELEASE_I386 i386',)
Stopping Process #0
Stopping Process #1
python multisnmp.py 0.18s user 0.08s system 107% cpu 0.236 total
配置 OS X 的 SNMPD
如果希望配置 OS X 的 SNMP Daemon 以針對(duì)本文進(jìn)行測(cè)試,那么需要執(zhí)行下面的操作。首先,在 shell 中使用三個(gè)命令重寫配置文件:
$ sudo cp /etc/snmp/snmpd.conf /etc/snmp/snmpd.conf.bak.testing
$ sudo echo "rocommunity public" > /etc/snmp/snmpd.conf
$ sudo snmpd
這將有效地備份您的配置,生成一個(gè)新配置,然后重新啟動(dòng) snmpd。步驟與許多 UNIX 平臺(tái)類似,但步驟 3 是除外,該步驟需要重新啟動(dòng) snmpd,或發(fā)送一個(gè) HUP。如果希望 OS X 在啟動(dòng)后永久運(yùn)行 snmpd,那么可以按如下所示編輯這個(gè) plist 文件:
/System/Library/LaunchDaemons/org.net-snmp.snmpd.plist
plist version="1.0">
Disabled
KeepAlive
Label
org.net-snmp.snmpd
OnDemand
Program
/usr/sbin/snmpd
ProgramArguments
snmpd
-f
RunAtLoad
ServiceIPC
如果希望對(duì)多臺(tái)機(jī)器進(jìn)行測(cè)試,那么使用下面的內(nèi)容替換主機(jī)行就可以輕松執(zhí)行修改:
hosts = ["192.168.1.100", SnmpSession(DestHost="example.com", Community="mysecret"),
"example.net", "example.org"]
運(yùn)行作業(yè)的 worker 函數(shù)將獲得兩個(gè)字符串形式的主機(jī)名和完整的 SnmpSession 對(duì)象。
結(jié)束語
官方文檔與多進(jìn)程庫一樣有用,您應(yīng)當(dāng)特別關(guān)注其中提到的以下這些事項(xiàng):避免共享狀態(tài);最好顯式地連接所創(chuàng)建的進(jìn)程;盡量避免終止具有共享狀態(tài)的進(jìn)程;最后確保在連接前刪除隊(duì)列中的所有隊(duì)列項(xiàng),否則將出現(xiàn)死鎖。官方文檔中提供了有關(guān)最佳實(shí)踐的更多詳細(xì)信息,因此建議您閱讀 參考資料 小節(jié)中的編程資源指南。
除了以上的注意事項(xiàng)之外,多進(jìn)程也是 Python 編程語言的一大增強(qiáng)。盡管 GIL 對(duì)線程的限制曾經(jīng)被認(rèn)為是一個(gè)弱點(diǎn),但是通過包含強(qiáng)大靈活的多進(jìn)程庫,Python 不僅彌補(bǔ)了這個(gè)弱點(diǎn),而且還得到了增強(qiáng)。非常感謝 David Goodger 擔(dān)任本文的技術(shù)審校!
參考資料
學(xué)習(xí)
* Python For Unix and Linux System Administration
* Google App Engine in Action
* Official Documentation Multiprocessing Module
* Multiprocessing Programming Resources Guide
* 使用 Python 進(jìn)行線程編程
* Expert Python Programming
* Global Interpreter Lock
* Net-SNMP
* AIX and UNIX 專區(qū):developerWorks 的“AIX and UNIX 專區(qū)”提供了大量與 AIX 系統(tǒng)管理的所有方面相關(guān)的信息,您可以利用它們來擴(kuò)展自己的 UNIX 技能。
* AIX and UNIX 新手入門:訪問“AIX and UNIX 新手入門”頁面可了解更多關(guān)于 AIX 和 UNIX 的內(nèi)容。
* AIX and UNIX 專題匯總:AIX and UNIX 專區(qū)已經(jīng)為您推出了很多的技術(shù)專題,為您總結(jié)了很多熱門的知識(shí)點(diǎn)。我們?cè)诤竺孢會(huì)繼續(xù)推出很多相關(guān)的熱門專題給您,為了方便您的訪問,我們?cè)谶@里為您把本專區(qū)的所有專題進(jìn)行匯總,讓您更方便的找到您需要的內(nèi)容。
* 瀏覽 技術(shù)書店,查找有關(guān)這個(gè)主題和其他技術(shù)主題的圖書。
獲得產(chǎn)品和技術(shù)
* 可以免費(fèi) 下載 Squirrel Shell。
討論
* 參與 developerWorks blogs 并加入 developerWorks 社區(qū)。
* 參與 AIX and UNIX 論壇:
o AIX 論壇
o 針對(duì)開發(fā)人員的 AIX 論壇
o Cluster Systems Management
o IBM Support Assistant 論壇
o 性能工具論壇
o 虛擬化論壇
o 更多 AIX and UNIX 論壇
關(guān)于作者
Photo of Noah Gift
Noah Gift 是 O'Reilly 出版的 Python For Unix and Linux System Administration 一書的合著者,并且現(xiàn)在還在為 Manning 編著 Google App Engine In Action 一書。他是一名作家、演說家、顧問和社區(qū)負(fù)責(zé)人,并為 IBM developerWorks、Red Hat Magazine、O'Reilly 和 MacTech 撰稿。他的咨詢公司的網(wǎng)站是 http://www.giftcs.com,他的個(gè)人網(wǎng)站是 http://noahgift.com。 Noah 擁有加州洛杉磯的 CIS 的碩士學(xué)位,加州 Poly San Luis Obispo 的營(yíng)養(yǎng)科學(xué)學(xué)士學(xué)位,他還是通過 Apple 和 LPI 認(rèn)證的系統(tǒng)管理員,他曾經(jīng)在許多公司工作過,如加利福尼亞理工學(xué)院、Disney Feature Animation、Sony Imageworks 和 Turner Studios。他目前在新西蘭的 Weta Digital 工作。在空閑的時(shí)候,他喜歡和妻子 Leah 以及他們的兒子 Liam 一起度過,譜寫鋼琴曲、參加馬拉松比賽以及積極地參與體育活動(dòng)。
本文來自ChinaUnix博客,如果查看原文請(qǐng)點(diǎn):http://blog.chinaunix.net/u/21908/showart_2092140.html |
|