歡迎您光臨本站 註冊首頁

使用 Python 實現多進程

←手機掃碼閱讀     火星人 @ 2014-03-12 , reply:0
  
通過使用 Python 2.6 內置的多進程模塊,將您的 Unix® Python 應用程序擴展為使用多核。多進程模擬了 Python 線程 API 的部分功能,讓開發人員能夠對多組進程進行高級控制,同時也合併了許多特定於進程的額外特性。

簡介

在 IBM® developerWorks® 的 早期文章 中,我演示了使用 Python 實現線程式編程的一種簡單且有效的模式。但是,這種方法的一個缺陷就是它並不總是能夠提高應用程序的速度,因為全局解釋器鎖(Global Interpreter Lock,GIL)將線程有效地限制到一個核中。如果需要使用計算機中的所有核,那麼通常都需通過 對 經常使用 fork 操作來實現,從而提高速度。處理進程組是件困難的事情,因為為了在進程之間進行通信,需要對所有調用進行協調,這通常會使事情變得更複雜。

幸運的是,自 2.6 版本起,Python 包括了一個名為 “多進程(multiprocessing)” 的模塊來幫助處理進程。該進程模塊的 API 與線程 API 的工作方式有些相似點,但是也存在一些需要特別注意的不同之處。主要區別之一就是進程擁有的一些微妙的底層行為,這是高級 API 永遠無法完全抽象出來的。可以從多進程模塊的官方文檔中了解有關這方面內容(參見 參考資料 小節)。

fork 簡介

進程和線程在併發性的工作原理方面存在一些明顯的差異。通過閱讀我撰寫的有關線程的 developerWorks 文章,可以進一步了解這些差異(參見 參考資料)。在進程執行 fork 時,操作系統將創建具有新進程 ID 的新的子進程,複製父進程的狀態(內存、環境變數等)。首先,在我們實際使用進程模塊之前,先看一下 Python 中的一個非常基本的 fork 操作。


fork.py
#!/usr/bin/env python    """A basic fork in action"""    import os    def my_fork():      child_pid = os.fork()      if child_pid == 0:          print "Child Process: PID# %s" % os.getpid()      else:          print "Parent Process: PID# %s" % os.getpid()    if __name__ == "__main__":      my_fork()    

現在來看一下以上代碼的輸出:

mac% python fork.py  Parent Process: PID# 5285  Child Process: PID# 5286  

在下一個示例中,增強初始 fork 的代碼,並設置一個環境變數。該環境變數隨後將被複制到子進程中。下面給出了相應的代碼:


示例 1. Python 中的 fork 操作
#!/usr/bin/env python    """A fork that demonstrates a copied environment"""    import os  from os import environ    def my_fork():      environ['FOO']="baz"      print "FOO environmental variable set to: %s" % environ['FOO']      environ['FOO']="bar"      print "FOO environmental variable changed to: %s" % environ['FOO']      child_pid = os.fork()      if child_pid == 0:          print "Child Process: PID# %s" % os.getpid()          print "Child FOO environmental variable == %s" % environ['FOO']      else:          print "Parent Process: PID# %s" % os.getpid()          print "Parent FOO environmental variable == %s" % environ['FOO']    if __name__ == "__main__":      my_fork()    

下面給出了 fork 的輸出:

mac% python env_fork.py  FOO environmental variable set to: baz  FOO environmental variable changed to: bar  Parent Process: PID# 5333  Parent FOO environmental variable == bar  Child Process: PID# 5334  Child FOO environmental variable == bar  

在輸出中,可以看到 “修改後的” 環境變數 FOO 留在了子進程和父進程中。您可以通過在父進程中再次修改環境變數來進一步測試這個示例,您將看到子進程現在是完全獨立的,它有了自己的生命。注意,子進程模塊也可用於 fork 進程,但是實現方式沒有多進程模塊那麼複雜。

多進程簡介

現在您已經了解 Python fork 操作的基本知識,讓我們通過一個簡單例子了解它在更高級的多進程庫中的使用。在這個示例中,仍然會出現 fork,但是已經為我們處理了大部分標準工作。


示例 2. 簡單的多進程
#!/usr/bin/env python  from multiprocessing import Process  import os  import time    def sleeper(name, seconds):     print 'starting child process with id: ', os.getpid()     print 'parent process:', os.getppid()     print 'sleeping for %s ' % seconds     time.sleep(seconds)     print "Done sleeping"      if __name__ == '__main__':     print "in parent process (id %s)" % os.getpid()     p = Process(target=sleeper, args=('bob', 5))     p.start()     print "in parent process after child process start"     print "parent process about to join child process"     p.join()     print "in parent process after child process join"      print "parent process exiting with id ", os.getpid()     print "The parent's parent process:", os.getppid()  

如果查看輸出,將會看到下面的內容:

mac% python simple.py   in parent process (id 5245)  in parent process after child process start  parent process about to join child process  starting child process with id:  5246  parent process: 5245  sleeping for 5   Done sleeping  in parent process after child process join  parent process exiting with id  5245  The parent's parent process: 5231    

可以看到從主進程分出了一個子進程,該子進程隨後休眠了 5 秒種。子進程分配是在調用 p.start() 時發生的。在下一節中,您將看到這個基礎的程序將擴展為更大的程序。

構建非同步 Net-SNMP 引擎

到目前為止,您尚未構建任何特別有用的內容。下一個示例將解決一個實際問題,為 Net-SNMP 非同步生成 Python 綁定。默認情況下,Net-SNMP 將阻塞每一個 Python 調用。使用多進程庫可以非常簡單地將 Net-SNMP 庫轉換為完全非同步的操作。

在開始之前,需要檢查是否安裝了一些必備的內容,以便使用 Python 2.6 多進程庫和 Net-SNMP 綁定:

  1. 下載 Python 2.6 並針對所使用的操作系統進行編譯:Python 2.6 下載
  2. 調整 shell 路徑,這樣在輸入 python 時就會啟動 Python 2.6。例如,如果將 Python 編譯到 /usr/local/bin/,您就需要預先處理 $PATH 變數,從而確保它位於一個較舊的 Python 版本之前。
  3. 下載並安裝設置工具:設置工具
  4. 下載 Net-SNMP,除了使用其他操作系統所需的標記(參見相應的 README 文件)外,另外使用一個 “--with-python-modules” 標記進行配置。 ./configure --with-python-modules

按如下所示編譯 Net-SNMP:

---------------------------------------------------------              Net-SNMP configuration summary:  ---------------------------------------------------------      SNMP Versions Supported:    1 2c 3    Net-SNMP Version:           5.4.2.1    Building for:               darwin9    Network transport support:  Callback Unix TCP UDP    SNMPv3 Security Modules:     usm    Agent MIB code:             default_modules =>  snmpv3mibs mibII ucd_snmp notification   notification-log-mib target agent_mibs agentx disman/event disman/schedule utilities    Embedded Perl support:      enabled    SNMP Perl modules:          building -- embeddable    SNMP Python modules:        building for /usr/local/bin//python    Authentication support:     MD5 SHA1    Encryption support:         DES AES  ]]  

查看以下模塊的代碼,您將隨後運行它。


示例 3. Net-SNMP 的多進程包裝器
#!/usr/bin/env python2.6  """  This is a multiprocessing wrapper for Net-SNMP.  This makes a synchronous API asynchronous by combining  it with Python2.6  """    import netsnmp  from multiprocessing import Process, Queue, current_process    class HostRecord():      """This creates a host record"""      def __init__(self,                   hostname = None,                   query = None):          self.hostname = hostname          self.query = query    class SnmpSession():      """A SNMP Session"""      def __init__(self,                  oid = "sysDescr",                  Version = 2,                  DestHost = "localhost",                  Community = "public",                  Verbose = True,                  ):          self.oid = oid          self.Version = Version          self.DestHost = DestHost          self.Community = Community          self.Verbose = Verbose          self.var = netsnmp.Varbind(oid, 0)          self.hostrec = HostRecord()          self.hostrec.hostname = self.DestHost        def query(self):          """Creates SNMP query            Fills out a Host Object and returns result          """          try:              result = netsnmp.snmpget(self.var,                                  Version = self.Version,                                  DestHost = self.DestHost,                                  Community = self.Community)              self.hostrec.query = result          except Exception, err:              if self.Verbose:                  print err              self.hostrec.query = None          finally:              return self.hostrec    def make_query(host):      """This does the actual snmp query        This is a bit fancy as it accepts both instances      of SnmpSession and host/ip addresses.  This      allows a user to customize mass queries with      subsets of different hostnames and community strings      """      if isinstance(host,SnmpSession):          return host.query()      else:          s = SnmpSession(DestHost=host)          return s.query()    # Function run by worker processes  def worker(input, output):      for func in iter(input.get, 'STOP'):          result = make_query(func)          output.put(result)    def main():      """Runs everything"""        #clients      hosts = ["localhost", "localhost"]      NUMBER_OF_PROCESSES = len(hosts)        # Create queues      task_queue = Queue()      done_queue = Queue()        #submit tasks      for host in hosts:          task_queue.put(host)        #Start worker processes      for i in range(NUMBER_OF_PROCESSES):          Process(target=worker, args=(task_queue, done_queue)).start()         # Get and print results      print 'Unordered results:'      for i in range(len(hosts)):          print '\t', done_queue.get().query        # Tell child processes to stop      for i in range(NUMBER_OF_PROCESSES):          task_queue.put('STOP')          print "Stopping Process #%s" % i    if __name__ == "__main__":      main()  

這裡有兩個類,一個 HostRecord 類和一個 SnmpSession 類。SnmpSession 類包含一個使用 Net-SNMP 的 SNMP 庫實際執行查詢的方法。由於調用一般都會進行阻塞,因此需要導入多進程庫並使用 Process 運行它。此外,傳入一個 task_queue 和一個 done_queue,這樣可以同步並保護進出進程池的數據。如果對線程比較熟悉的話,將會注意到這種方式非常類似於線程 API 使用的方法。

需要特別關注一下主函數中 #clients 部分的主機列表。注意,可以對 50 或 100 台或更多主機運行非同步 SNMP 查詢,具體取決於當前使用的硬體。NUMBER_OF_PROCESSES 變數的設置非常簡單,只是被設置為主機列表中的主機數。最終,最後兩個部分在處理過程中從隊列獲取結果,然後將一個 “STOP” 消息放到隊列中,表示可以終止進程。

如果在對 Net-SNMP 進行監聽的 OS X 機器上運行代碼,那麼將會得到如下所示的非阻塞輸出:

mac% time python multisnmp.py  Unordered results:       ('Darwin mac.local 9.6.0 Darwin Kernel Version 9.6.0: Mon Nov 24 17:37:00 PST 2008;  root:xnu-1228.9.59~1/RELEASE_I386 i386',)       ('Darwin mac.local 9.6.0 Darwin Kernel Version 9.6.0: Mon Nov 24 17:37:00 PST 2008;  root:xnu-1228.9.59~1/RELEASE_I386 i386',)  Stopping Process #0  Stopping Process #1  python multisnmp.py  0.18s user 0.08s system 107% cpu 0.236 total  

配置 OS X 的 SNMPD

如果希望配置 OS X 的 SNMP Daemon 以針對本文進行測試,那麼需要執行下面的操作。首先,在 shell 中使用三個命令重寫配置文件:

            $ sudo cp /etc/snmp/snmpd.conf /etc/snmp/snmpd.conf.bak.testing              $ sudo echo "rocommunity public" > /etc/snmp/snmpd.conf               $ sudo snmpd           

這將有效地備份您的配置,生成一個新配置,然後重新啟動 snmpd。步驟與許多 UNIX 平台類似,但步驟 3 是除外,該步驟需要重新啟動 snmpd,或發送一個 HUP。如果希望 OS X 在啟動后永久運行 snmpd,那麼可以按如下所示編輯這個 plist 文件:

/System/Library/LaunchDaemons/org.net-snmp.snmpd.plist
        <?xml version="1.0" encoding="UTF-8"?> <!DOCTYPE plist PUBLIC "-//Apple//DTD   PLIST 1.0//EN" "http://www.apple.com/DTDs/PropertyList-1.0.dtd">   plist version="1.0">   <dict>  	<key>Disabled</key>       	<false/>       	<key>KeepAlive</key>       	<true/>       	<key>Label</key>       	<string>org.net-snmp.snmpd</string>       	<key>OnDemand</key>       	<false/>       	<key>Program</key>       	<string>/usr/sbin/snmpd</string>       	<key>ProgramArguments</key>       	<array>           		<string>snmpd</string>  	        <string>-f</string>       	</array>      	 <key>RunAtLoad</key>       	<true/>       	<key>ServiceIPC</key>       	<false/>   </dict>   </plist>                   

如果希望對多台機器進行測試,那麼使用下面的內容替換主機行就可以輕鬆執行修改:

hosts = ["192.168.1.100", SnmpSession(DestHost="example.com", Community="mysecret"),   "example.net", "example.org"]  

運行作業的 worker 函數將獲得兩個字元串形式的主機名和完整的 SnmpSession 對象。

結束語

官方文檔與多進程庫一樣有用,您應當特別關注其中提到的以下這些事項:避免共享狀態;最好顯式地連接所創建的進程;盡量避免終止具有共享狀態的進程;最後確保在連接前刪除隊列中的所有隊列項,否則將出現死鎖。官方文檔中提供了有關最佳實踐的更多詳細信息,因此建議您閱讀 參考資料 小節中的編程資源指南。

除了以上的注意事項之外,多進程也是 Python 編程語言的一大增強。儘管 GIL 對線程的限制曾經被認為是一個弱點,但是通過包含強大靈活的多進程庫,Python 不僅彌補了這個弱點,而且還得到了增強。非常感謝 David Goodger 擔任本文的技術審校!(責任編輯:A6)



[火星人 ] 使用 Python 實現多進程已經有981次圍觀

http://coctec.com/docs/linux/show-post-68879.html