|
最近會開始給大家出些Python進階系列的文章,這是該系列的第一篇文章,介紹進程和線程的知識,本次的Python基礎(chǔ)教程則跟大家介紹下進程和線程的概念,多進程和多線程各自的實現(xiàn)方法和優(yōu)缺點,以及分別在哪些情況采用多進程,或者是多線程。 概念 并發(fā)編程就是實現(xiàn)讓程序同時執(zhí)行多個任務(wù),而如何實現(xiàn)并發(fā)編程呢,這里就涉及到進程和線程這兩個概念。 對于操作系統(tǒng)來說,一個任務(wù)(或者程序)就是一個進程(Process),比如打開一個瀏覽器是開啟一個瀏覽器進程,打開微信就啟動了一個微信的進程,打開兩個記事本,就啟動兩個記事本進程。 進程的特點有: 操作系統(tǒng)以進程為單位分配存儲空間, 每個進程有自己的地址空間、數(shù)據(jù)棧以及其他用于跟蹤進程執(zhí)行的輔助數(shù)據(jù); 進程可以通過 fork 或者 spawn 方式創(chuàng)建新的進程來執(zhí)行其他任務(wù) 進程都有自己獨立的內(nèi)存空間,所以進程需要通過進程間通信機制(IPC,Inter-Process Communication)來實現(xiàn)數(shù)據(jù)共享,具體的方式包括管道、信號、套接字、共享內(nèi)存區(qū)等
一個進程還可以同時做多件事情,比如在 Word 里面同時進行打字、拼音檢查、打印等事情,也就是一個任務(wù)分為多個子任務(wù)同時進行,這些進程內(nèi)的子任務(wù)被稱為線程(Thread)。 
因為每個進程至少需要完成一件事情,也就是一個進程至少有一個線程。當(dāng)要實現(xiàn)并發(fā)編程,也就是同時執(zhí)行多任務(wù)時,有以下三種解決方案: 多進程,每個進程只有一個線程,但多個進程一起執(zhí)行多個任務(wù); 多線程,只啟動一個進程,但一個進程內(nèi)開啟多個線程; 多進程+多線程,即啟動多個進程,每個進程又啟動多個線程,但這種方法非常復(fù)雜,實際很少使用
注意:真正的并行執(zhí)行多任務(wù)只有在多核 CPU 上才可以實現(xiàn),單核 CPU 系統(tǒng)中,真正的并發(fā)是不可能的,因為在某個時刻能夠獲得CPU的只有唯一的一個線程,多個線程共享了CPU的執(zhí)行時間。 Python 是同時支持多進程和多線程的,下面就分別介紹多進程和多線程。 多進程 在 Unix/Linux 系統(tǒng)中,提供了一個 fork() 系統(tǒng)調(diào)用,它是一個特殊的函數(shù),普通函數(shù)調(diào)用是調(diào)用一次,返回一次,但 fork 函數(shù)調(diào)用一次,返回兩次,因為調(diào)用該函數(shù)的是父進程,然后復(fù)制出一份子進程了,最后同時在父進程和子進程內(nèi)返回,所以會返回兩次。 子進程返回的永遠是 0 ,而父進程會返回子進程的 ID,因為父進程可以復(fù)制多個子進程,所以需要記錄每個子進程的 ID,而子進程可以通過調(diào)用 getpid() 獲取父進程的 ID。 Python 中 os 模塊封裝了常見的系統(tǒng)調(diào)用,這就包括了 fork ,代碼示例如下: import os print('Process (%s) start...' % os.getpid()) # Only works on Unix/Linux/Mac: pid = os.fork() if pid == 0: print('I am child process (%s) and my parent is %s.' % (os.getpid(), os.getppid())) else: print('I (%s) just created a child process (%s).' % (os.getpid(), pid))
運行結(jié)果: Process (876) start... I (876) just created a child process (877). I am child process (877) and my parent is 876.
由于 windows 系統(tǒng)中是不存在 fork ,所以上述函數(shù)無法調(diào)用,但 Python 是跨平臺的,所以也還是有其他模塊可以實現(xiàn)多進程的功能,比如 multiprocessing模塊。 multiprocess multiprocessing 模塊中提供了 Process 類來代表一個進程對象,接下來用一個下載文件的例子來說明采用多進程和不用多進程的差別。 首先是不采用多進程的例子: def download_task(filename): '''模擬下載文件''' print('開始下載%s...' % filename) time_to_download = randint(5, 10) sleep(time_to_download) print('%s下載完成! 耗費了%d秒' % (filename, time_to_download)) def download_without_multiprocess(): '''不采用多進程''' start = time() download_task('Python.pdf') download_task('nazha.mkv') end = time() print('總共耗費了%.2f秒.' % (end - start)) if __name__ == '__main__': download_without_multiprocess()
運行結(jié)果如下,這里用 randint 函數(shù)來隨機輸出當(dāng)前下載文件的耗時,從結(jié)果看,程序運行時間等于兩個下載文件的任務(wù)時間總和。 開始下載Python.pdf... Python.pdf下載完成! 耗費了9秒 開始下載nazha.mkv... nazha.mkv下載完成! 耗費了9秒 總共耗費了18.00秒.
如果是采用多進程,例子如下所示: def download_task(filename): '''模擬下載文件''' print('開始下載%s...' % filename) time_to_download = randint(5, 10) sleep(time_to_download) print('%s下載完成! 耗費了%d秒' % (filename, time_to_download)) def download_multiprocess(): '''采用多進程''' start = time() p1 = Process(target=download_task, args=('Python.pdf',)) p1.start() p2 = Process(target=download_task, args=('nazha.mkv',)) p2.start() p1.join() p2.join() end = time() print('總共耗費了%.2f秒.' % (end - start)) if __name__ == '__main__': download_multiprocess()
這里多進程例子中,我們通過 Process 類創(chuàng)建了進程對象,通過 target 參數(shù)傳入一個函數(shù)表示進程需要執(zhí)行的任務(wù),args 是一個元組,表示傳遞給函數(shù)的參數(shù),然后采用 start 來啟動進程,而 join 方法表示等待進程執(zhí)行結(jié)束。 運行結(jié)果如下所示,耗時就不是兩個任務(wù)執(zhí)行時間總和,速度上也是大大的提升了。 開始下載Python.pdf... 開始下載nazha.mkv... Python.pdf下載完成! 耗費了5秒 nazha.mkv下載完成! 耗費了9秒 總共耗費了9.36秒.
Pool 上述例子是開啟了兩個進程,但如果需要開啟大量的子進程,上述代碼的寫法就不合適了,應(yīng)該采用進程池的方式批量創(chuàng)建子進程,還是用下載文件的例子,但執(zhí)行下部分的代碼如下所示: import os from multiprocessing import Process, Pool from random import randint from time import time, sleep def download_multiprocess_pool(): '''采用多進程,并用 pool 管理進程池''' start = time() filenames = ['Python.pdf', 'nazha.mkv', 'something.mp4', 'lena.png', 'lol.avi'] # 進程池 p = Pool(5) for i in range(5): p.apply_async(download_task, args=(filenames[i], )) print('Waiting for all subprocesses done...') # 關(guān)閉進程池 p.close() # 等待所有進程完成任務(wù) p.join() end = time() print('總共耗費了%.2f秒.' % (end - start)) if __name__ == '__main__': download_multiprocess_pool()
代碼中 Pool 對象先創(chuàng)建了 5 個進程,然后 apply_async 方法就是并行啟動進程執(zhí)行任務(wù)了,調(diào)用 join() 方法之前必須先調(diào)用 close() ,close() 主要是關(guān)閉進程池,所以執(zhí)行該方法后就不能再添加新的進程對象了。然后 join() 就是等待所有進程執(zhí)行完任務(wù)。 運行結(jié)果如下所示: Waiting for all subprocesses done... 開始下載Python.pdf... 開始下載nazha.mkv... 開始下載something.mp4... 開始下載lena.png... 開始下載lol.avi... nazha.mkv下載完成! 耗費了5秒 lena.png下載完成! 耗費了6秒 something.mp4下載完成! 耗費了7秒 Python.pdf下載完成! 耗費了8秒 lol.avi下載完成! 耗費了9秒 總共耗費了9.80秒.
子進程 大多數(shù)情況,子進程是一個外部進程,而非自身。在創(chuàng)建子進程后,我們還需要控制子進程的輸入和輸出。 subprocess 模塊可以讓我們很好地開啟子進程以及管理子進程的輸入和輸出。 下面是演示如何用 Python 演示命令 nslookup www.python.org,代碼如下所示: import subprocess print('$ nslookup www.python.org') r = subprocess.call(['nslookup', 'www.python.org']) print('Exit code:', r)
運行結(jié)果: $ nslookup www.python.org Server: 192.168.19.4 Address: 192.168.19.4#53 Non-authoritative answer: www.python.org canonical name = python.map.fastly.net. Name: python.map.fastly.net Address: 199.27.79.223 Exit code: 0
如果子進程需要輸入,可以通過 communicate() 進行輸入,代碼如下所示: import subprocess print('$ nslookup') p = subprocess.Popen(['nslookup'], stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE) output, err = p.communicate(b'set q=mx\npython.org\nexit\n') print(output.decode('utf-8')) print('Exit code:', p.returncode)
這段代碼就是執(zhí)行命令 nslookup 時,輸入: set q=mx python.org exit
運行結(jié)果: $ nslookup Server: 192.168.19.4 Address: 192.168.19.4#53 Non-authoritative answer: python.org mail exchanger = 50 mail.python.org. Authoritative answers can be found from: mail.python.org internet address = 82.94.164.166 mail.python.org has AAAA address 2001:888:2000:d::a6 Exit code: 0
進程間通信 進程之間是需要通信的,multiprocess 模塊中也提供了 Queue、Pipes 等多種方式來交換數(shù)據(jù)。 這里以 Queue 為例,在父進程創(chuàng)建兩個子進程,一個往 Queue 寫入數(shù)據(jù),另一個從 Queue 讀取數(shù)據(jù)。代碼如下: import os from multiprocessing import Process, Queue import random from time import time, sleep # 寫數(shù)據(jù)進程執(zhí)行的代碼: def write(q): print('Process to write: %s' % os.getpid()) for value in ['A', 'B', 'C']: print('Put %s to queue...' % value) q.put(value) sleep(random.random()) # 讀數(shù)據(jù)進程執(zhí)行的代碼: def read(q): print('Process to read: %s' % os.getpid()) while True: value = q.get(True) print('Get %s from queue.' % value) def ipc_queue(): ''' 采用 Queue 實現(xiàn)進程間通信 :return: ''' # 父進程創(chuàng)建Queue,并傳給各個子進程: q = Queue() pw = Process(target=write, args=(q,)) pr = Process(target=read, args=(q,)) # 啟動子進程pw,寫入: pw.start() # 啟動子進程pr,讀取: pr.start() # 等待pw結(jié)束: pw.join() # pr進程里是死循環(huán),無法等待其結(jié)束,只能強行終止: pr.terminate() if __name__ == '__main__': ipc_queue()
運行結(jié)果如下所示: Process to write: 24992 Put A to queue... Process to read: 22836 Get A from queue. Put B to queue... Get B from queue. Put C to queue... Get C from queue.
本Python基礎(chǔ)教程主要是介紹進程和線程的概念,然后就是介紹多進程及其實現(xiàn)方式,在下一篇文章會介紹多線程的實現(xiàn),以及兩種方式應(yīng)該如何選擇。
|