大良營(yíng)銷網(wǎng)站建設(shè)信息整站seo排名要多少錢(qián)
目錄
- 1. 引言
- 1.1 并發(fā)與并行的區(qū)別
- 2. 多進(jìn)程開(kāi)發(fā)
- 2.1 `Process` 類的常用方法
- 2.2 進(jìn)程的生命周期與同步
- 3. 進(jìn)程之間的數(shù)據(jù)共享
- 3.1 使用 `Value` 和 `Array`
- 3.2 使用 `Manager` 實(shí)現(xiàn)高級(jí)數(shù)據(jù)共享
- 4. 進(jìn)程鎖
- 4.1 更復(fù)雜的鎖應(yīng)用
- 4.2 鎖的死鎖與避免
- 4.3 信號(hào)量與條件變量
- 5. 進(jìn)程池
- 5.1 imap與starmap
- 5.2 apply與apply_async
1. 引言
在當(dāng)今計(jì)算密集型應(yīng)用和數(shù)據(jù)密集型任務(wù)中,高效的并發(fā)處理能力顯得尤為重要。Python 提供了多種并發(fā)編程的方式來(lái)提升程序性能,其中 multiprocessing
模塊作為一種基于多進(jìn)程的并行處理方案,受到了廣泛關(guān)注和使用。與多線程不同,multiprocessing
能夠在多核處理器上真正實(shí)現(xiàn)并行執(zhí)行,突破了 Python 中因全局解釋器鎖(GIL)限制而導(dǎo)致的并發(fā)瓶頸。因此,multiprocessing
模塊在處理需要充分利用 CPU 計(jì)算能力的任務(wù)時(shí)尤其高效。
1.1 并發(fā)與并行的區(qū)別
要理解 Python multiprocessing
模塊的優(yōu)勢(shì),首先需要區(qū)分并發(fā)和并行這兩個(gè)概念。并發(fā)是指在同一時(shí)間段內(nèi)處理多個(gè)任務(wù),這些任務(wù)并不一定是同時(shí)運(yùn)行的,而是交替執(zhí)行的,主要用于提高程序響應(yīng)能力。相較之下,并行意味著在同一時(shí)刻同時(shí)執(zhí)行多個(gè)任務(wù),通常運(yùn)行在不同的 CPU 核心上,可以顯著提升計(jì)算性能。multiprocessing
模塊旨在通過(guò)創(chuàng)建獨(dú)立的進(jìn)程來(lái)實(shí)現(xiàn)并行計(jì)算,從而讓程序能夠同時(shí)執(zhí)行多個(gè)任務(wù),最大限度地利用多核處理器的性能。
2. 多進(jìn)程開(kāi)發(fā)
Python 的 multiprocessing
模塊提供了強(qiáng)大的多進(jìn)程支持,使得開(kāi)發(fā)者能夠充分利用現(xiàn)代多核 CPU 的處理能力。通過(guò) multiprocessing
模塊,開(kāi)發(fā)者可以輕松地創(chuàng)建和管理多個(gè)獨(dú)立運(yùn)行的子進(jìn)程,從而顯著提高程序的并發(fā)性能。不同于多線程開(kāi)發(fā),multiprocessing
通過(guò)在操作系統(tǒng)層面創(chuàng)建進(jìn)程來(lái)避免全局解釋器鎖(GIL)的限制,因此更適合 CPU 密集型任務(wù)。
multiprocessing
中的 Process
類是創(chuàng)建和控制子進(jìn)程的核心工具。開(kāi)發(fā)者可以將目標(biāo)函數(shù)和其參數(shù)傳遞給 Process
對(duì)象,并通過(guò)調(diào)用其方法來(lái)啟動(dòng)、控制和管理進(jìn)程的執(zhí)行。下面是一個(gè)簡(jiǎn)單的示例,展示了如何使用 Process
類來(lái)創(chuàng)建和啟動(dòng)一個(gè)進(jìn)程:
import multiprocessing
import os
import timedef worker_function(name, duration):print(f'Process {name} (PID: {os.getpid()}) started.')time.sleep(duration) # 模擬耗時(shí)任務(wù)print(f'Process {name} (PID: {os.getpid()}) finished.')if __name__ == "__main__":# 創(chuàng)建一個(gè)進(jìn)程對(duì)象,并指定目標(biāo)函數(shù)和參數(shù)process = multiprocessing.Process(target=worker_function, args=('TestProcess', 3))process.start() # 啟動(dòng)進(jìn)程process.join() # 等待進(jìn)程結(jié)束print("Main process continues after child process.")
在這段代碼中,Process
對(duì)象被創(chuàng)建并傳入 worker_function
作為目標(biāo)函數(shù)。args
參數(shù)用來(lái)指定傳遞給目標(biāo)函數(shù)的參數(shù)。start()
方法用于啟動(dòng)該進(jìn)程,進(jìn)程會(huì)在后臺(tái)運(yùn)行,主進(jìn)程不會(huì)等待它結(jié)束而是繼續(xù)執(zhí)行。如果需要等待子進(jìn)程完成,開(kāi)發(fā)者可以使用 join()
方法。join()
會(huì)阻塞主進(jìn)程,直到子進(jìn)程完成,這對(duì)于需要保證執(zhí)行順序的場(chǎng)景非常有用。
2.1 Process
類的常用方法
Process
類不僅提供了創(chuàng)建和啟動(dòng)進(jìn)程的基礎(chǔ)方法,還包含其他一些有用的方法和屬性,幫助開(kāi)發(fā)者更好地控制和管理進(jìn)程。
start()
方法用于啟動(dòng)進(jìn)程。調(diào)用該方法后,操作系統(tǒng)會(huì)為進(jìn)程分配內(nèi)存空間并開(kāi)始執(zhí)行target
指定的函數(shù)。join()
方法用于讓主進(jìn)程等待子進(jìn)程的結(jié)束。如果不調(diào)用join()
,主進(jìn)程會(huì)繼續(xù)執(zhí)行而不管子進(jìn)程的狀態(tài)。is_alive()
方法返回一個(gè)布爾值,指示進(jìn)程是否仍在運(yùn)行。這對(duì)于在程序中動(dòng)態(tài)檢查進(jìn)程狀態(tài)非常有用。terminate()
方法用于立即結(jié)束進(jìn)程。它會(huì)向操作系統(tǒng)發(fā)送信號(hào)終止進(jìn)程,可能導(dǎo)致進(jìn)程未完成清理任務(wù)時(shí)被強(qiáng)行結(jié)束,因此應(yīng)謹(jǐn)慎使用。pid
屬性返回當(dāng)前進(jìn)程的 PID(進(jìn)程 ID),可以用來(lái)唯一標(biāo)識(shí)一個(gè)進(jìn)程。name
屬性用于獲取或設(shè)置進(jìn)程的名稱,便于調(diào)試和日志記錄。
以下是一個(gè)更復(fù)雜的示例,展示了如何使用這些方法和屬性來(lái)管理和跟蹤多個(gè)進(jìn)程:
import multiprocessing
import os
import timedef worker_function(name, duration):print(f'Process {name} (PID: {os.getpid()}) started.')time.sleep(duration)print(f'Process {name} (PID: {os.getpid()}) finished.')if __name__ == "__main__":processes = []for i in range(3):process = multiprocessing.Process(target=worker_function, args=(f'Worker-{i}', 2 + i))process.name = f'CustomProcess-{i}'processes.append(process)process.start()# 檢查所有進(jìn)程狀態(tài)for process in processes:print(f'{process.name} (PID: {process.pid}) is_alive: {process.is_alive()}')# 等待所有進(jìn)程結(jié)束for process in processes:process.join()print("All child processes have finished.")
我們創(chuàng)建了三個(gè)子進(jìn)程,并將它們添加到 processes
列表中。每個(gè)進(jìn)程在啟動(dòng)后會(huì)輸出其名稱和 PID,然后執(zhí)行一個(gè)模擬的耗時(shí)任務(wù)。主進(jìn)程使用 is_alive()
方法來(lái)檢查每個(gè)子進(jìn)程的狀態(tài),并在所有子進(jìn)程結(jié)束后繼續(xù)執(zhí)行。通過(guò)這種方式,開(kāi)發(fā)者可以輕松地在程序中管理多個(gè)并發(fā)任務(wù)。
2.2 進(jìn)程的生命周期與同步
在多進(jìn)程編程中,了解進(jìn)程的生命周期非常重要。每個(gè) Process
對(duì)象都經(jīng)歷以下幾個(gè)階段:
-
新建 (New):在創(chuàng)建進(jìn)程時(shí)使用
multiprocessing.Process()
方法,進(jìn)程會(huì)處于新建狀態(tài)。此時(shí),進(jìn)程對(duì)象已被創(chuàng)建但尚未啟動(dòng)。 -
就緒 (Ready):當(dāng)調(diào)用
start()
方法時(shí),進(jìn)程進(jìn)入就緒狀態(tài)。此時(shí),進(jìn)程等待操作系統(tǒng)的調(diào)度分配資源以準(zhǔn)備運(yùn)行。 -
運(yùn)行 (Running):當(dāng)進(jìn)程被操作系統(tǒng)調(diào)度并獲得 CPU 執(zhí)行權(quán)限時(shí),進(jìn)入運(yùn)行狀態(tài),開(kāi)始執(zhí)行其目標(biāo)函數(shù)。
-
阻塞 (Blocked):進(jìn)程在運(yùn)行時(shí)若需要等待資源或 I/O 操作,會(huì)進(jìn)入阻塞狀態(tài)。進(jìn)程在此狀態(tài)暫停執(zhí)行,直到資源變得可用。
-
終止 (Terminated):當(dāng)進(jìn)程完成其任務(wù)或調(diào)用
terminate()
方法時(shí),進(jìn)入終止?fàn)顟B(tài),結(jié)束其生命周期,釋放資源。
進(jìn)程的同步問(wèn)題是多進(jìn)程開(kāi)發(fā)中需要重點(diǎn)考慮的內(nèi)容。當(dāng)多個(gè)進(jìn)程同時(shí)訪問(wèn)共享資源時(shí),可能會(huì)出現(xiàn)數(shù)據(jù)不一致或競(jìng)態(tài)條件。為了解決這個(gè)問(wèn)題,可以使用 multiprocessing
提供的鎖機(jī)制。Lock
對(duì)象允許開(kāi)發(fā)者在需要的地方進(jìn)行同步,確保同一時(shí)刻只有一個(gè)進(jìn)程能夠訪問(wèn)共享資源。
import multiprocessingdef critical_section(lock, shared_list, item):with lock:shared_list.append(item)print(f'Item {item} added by {multiprocessing.current_process().name}')if __name__ == "__main__":lock = multiprocessing.Lock()manager = multiprocessing.Manager()shared_list = manager.list()processes = [multiprocessing.Process(target=critical_section, args=(lock, shared_list, i)) for i in range(5)]for p in processes:p.start()for p in processes:p.join()print("Final shared list:", list(shared_list))
我們創(chuàng)建了一個(gè) Lock
對(duì)象并在多個(gè)進(jìn)程中共享。當(dāng)進(jìn)程試圖訪問(wèn)共享資源 shared_list
時(shí),必須先獲取鎖,這樣可以防止多個(gè)進(jìn)程同時(shí)修改數(shù)據(jù)而導(dǎo)致的數(shù)據(jù)競(jìng)爭(zhēng)問(wèn)題。with lock:
確保鎖在代碼塊執(zhí)行完成后自動(dòng)釋放,從而實(shí)現(xiàn)了進(jìn)程的同步。
3. 進(jìn)程之間的數(shù)據(jù)共享
在多進(jìn)程開(kāi)發(fā)中,進(jìn)程之間的數(shù)據(jù)共享是一個(gè)常見(jiàn)且重要的需求。由于每個(gè)進(jìn)程都有自己獨(dú)立的內(nèi)存空間,因此在子進(jìn)程之間共享數(shù)據(jù)需要借助一些特殊的工具。multiprocessing
模塊提供了 Value
和 Array
這兩種共享內(nèi)存對(duì)象,使開(kāi)發(fā)者能夠在進(jìn)程之間共享數(shù)據(jù)。
3.1 使用 Value
和 Array
Value
和 Array
是 multiprocessing
提供的兩種基礎(chǔ)共享內(nèi)存對(duì)象。Value
允許不同進(jìn)程共享單個(gè)值,而 Array
則用于共享一個(gè)數(shù)組。在創(chuàng)建這些對(duì)象時(shí),需要指定數(shù)據(jù)類型代碼,如 i
表示整數(shù),d
表示雙精度浮點(diǎn)數(shù)。
以下示例展示了如何在多個(gè)進(jìn)程之間共享和修改數(shù)據(jù):
from multiprocessing import Process, Value, Arraydef modify_data(val, arr):val.value += 1for i in range(len(arr)):arr[i] += 1if __name__ == "__main__":shared_val = Value('i', 0)shared_arr = Array('i', [0, 1, 2, 3])process = Process(target=modify_data, args=(shared_val, shared_arr))process.start()process.join()print("Value:", shared_val.value)print("Array:", list(shared_arr))
Value
和 Array
允許子進(jìn)程訪問(wèn)和修改數(shù)據(jù),并在主進(jìn)程中查看修改結(jié)果。shared_val
是一個(gè)共享的整數(shù)值,shared_arr
是一個(gè)共享的整數(shù)數(shù)組。modify_data
函數(shù)中對(duì)它們進(jìn)行了簡(jiǎn)單的加操作,并通過(guò) process.join()
確保主進(jìn)程在子進(jìn)程完成后再輸出最終結(jié)果。
3.2 使用 Manager
實(shí)現(xiàn)高級(jí)數(shù)據(jù)共享
除了 Value
和 Array
,multiprocessing
還提供了 Manager
對(duì)象,用于實(shí)現(xiàn)更復(fù)雜的數(shù)據(jù)共享,如字典和列表。Manager
提供了高層次的接口,允許多個(gè)進(jìn)程共享數(shù)據(jù)結(jié)構(gòu),如列表、字典等。
下面是一個(gè)簡(jiǎn)單示例,展示如何使用 Manager
實(shí)現(xiàn)共享列表的修改:
from multiprocessing import Process, Managerdef modify_shared_data(shared_list):shared_list.append('item')if __name__ == "__main__":with Manager() as manager:shared_list = manager.list()process = Process(target=modify_shared_data, args=(shared_list,))process.start()process.join()print("Shared list:", shared_list)
通過(guò) Manager
,我們可以創(chuàng)建一個(gè)共享的列表 shared_list
并傳遞給子進(jìn)程進(jìn)行修改。with Manager() as manager:
語(yǔ)句確保 Manager
對(duì)象在使用結(jié)束后自動(dòng)釋放資源。
4. 進(jìn)程鎖
在多進(jìn)程開(kāi)發(fā)中,數(shù)據(jù)競(jìng)爭(zhēng)是開(kāi)發(fā)者需要解決的一個(gè)重要問(wèn)題。當(dāng)多個(gè)進(jìn)程同時(shí)訪問(wèn)和修改同一資源時(shí),可能會(huì)導(dǎo)致數(shù)據(jù)不一致或出現(xiàn)不可預(yù)見(jiàn)的行為,特別是在涉及共享資源的情況下。這種現(xiàn)象被稱為競(jìng)態(tài)條件,它會(huì)導(dǎo)致程序出現(xiàn)錯(cuò)誤,難以復(fù)現(xiàn)和調(diào)試。為了防止數(shù)據(jù)競(jìng)爭(zhēng)的發(fā)生,multiprocessing
模塊提供了 Lock
對(duì)象,這是一種簡(jiǎn)單卻非常有效的同步機(jī)制,能夠確保在任意時(shí)刻,只有一個(gè)進(jìn)程可以訪問(wèn)共享資源。
Lock
的概念類似于現(xiàn)實(shí)生活中的鑰匙。只有持有鎖的進(jìn)程才能訪問(wèn)共享資源,而其他試圖獲取該資源的進(jìn)程必須等待鎖被釋放后才能繼續(xù)。通過(guò)這種方式,可以保證多個(gè)進(jìn)程在訪問(wèn)共享數(shù)據(jù)時(shí)不會(huì)發(fā)生沖突。
使用 Lock
的方式非常直觀。開(kāi)發(fā)者可以使用 with
語(yǔ)句來(lái)簡(jiǎn)化代碼,這樣在使用鎖的代碼塊執(zhí)行完畢后,鎖會(huì)自動(dòng)釋放。下面的示例展示了如何使用 Lock
來(lái)確保多個(gè)進(jìn)程訪問(wèn)共享資源時(shí)的同步性:
from multiprocessing import Process, Lockdef print_with_lock(lock, text):with lock:print(text)if __name__ == "__main__":lock = Lock()processes = [Process(target=print_with_lock, args=(lock, f'Process {i}')) for i in range(5)]for p in processes:p.start()for p in processes:p.join()
每個(gè)子進(jìn)程在執(zhí)行 print_with_lock
函數(shù)時(shí)都會(huì)嘗試獲取鎖。只有在鎖被獲取的情況下,print()
函數(shù)才會(huì)被執(zhí)行,從而保證了輸出的順序性和一致性。通過(guò) with lock:
語(yǔ)句,鎖在使用后會(huì)被自動(dòng)釋放,這樣其他等待的進(jìn)程就可以繼續(xù)運(yùn)行,避免了因忘記釋放鎖而導(dǎo)致的死鎖問(wèn)題。
4.1 更復(fù)雜的鎖應(yīng)用
在某些情況下,開(kāi)發(fā)者可能需要更復(fù)雜的同步控制。例如,當(dāng)多個(gè)進(jìn)程需要同時(shí)讀取共享資源但只有在寫(xiě)操作時(shí)需要鎖定資源時(shí),就需要一種更加靈活的鎖策略。Python 的 multiprocessing
模塊不僅提供了簡(jiǎn)單的 Lock
,還提供了其他高級(jí)鎖機(jī)制,如 RLock
(可重入鎖)和 Semaphore
(信號(hào)量)。
RLock
是一種可重入鎖,允許同一個(gè)進(jìn)程在持有鎖的情況下多次獲取鎖而不會(huì)陷入死鎖。它特別適用于遞歸調(diào)用的場(chǎng)景,當(dāng)遞歸調(diào)用中需要多次獲取鎖時(shí),普通的 Lock
會(huì)導(dǎo)致死鎖,而 RLock
可以解決這個(gè)問(wèn)題。以下是一個(gè)使用 RLock
的示例:
from multiprocessing import Process, RLockdef recursive_task(lock, depth):with lock:print(f'Depth {depth} acquired lock')if depth > 0:recursive_task(lock, depth - 1)if __name__ == "__main__":lock = RLock()process = Process(target=recursive_task, args=(lock, 3))process.start()process.join()
recursive_task
是一個(gè)遞歸函數(shù),每次遞歸調(diào)用都會(huì)嘗試獲取鎖。如果使用普通的 Lock
,在第二次獲取鎖時(shí)會(huì)導(dǎo)致死鎖。而使用 RLock
,同一進(jìn)程可以多次獲取鎖,從而順利完成任務(wù)。
4.2 鎖的死鎖與避免
使用鎖雖然能夠避免數(shù)據(jù)競(jìng)爭(zhēng),但也會(huì)引入死鎖的風(fēng)險(xiǎn)。死鎖是一種常見(jiàn)的并發(fā)問(wèn)題,當(dāng)兩個(gè)或多個(gè)進(jìn)程彼此等待對(duì)方釋放鎖時(shí),就會(huì)陷入死鎖,導(dǎo)致程序無(wú)法繼續(xù)執(zhí)行。
要避免死鎖,開(kāi)發(fā)者需要遵循一些基本原則,比如避免嵌套獲取鎖或者使用超時(shí)參數(shù)來(lái)獲取鎖。例如,可以使用 acquire()
方法來(lái)嘗試獲取鎖,并設(shè)置超時(shí)時(shí)間。如果在超時(shí)時(shí)間內(nèi)未獲取到鎖,程序可以選擇放棄或采取其他措施:
from multiprocessing import Process, Lock
import timedef safe_task(lock, name):if lock.acquire(timeout=2): # 嘗試在2秒內(nèi)獲取鎖try:print(f'Process {name} acquired the lock')time.sleep(1) # 模擬任務(wù)處理finally:lock.release()print(f'Process {name} released the lock')else:print(f'Process {name} could not acquire the lock')if __name__ == "__main__":lock = Lock()processes = [Process(target=safe_task, args=(lock, f'P{i}')) for i in range(3)]for p in processes:p.start()for p in processes:p.join()
每個(gè)進(jìn)程在嘗試獲取鎖時(shí)會(huì)等待最多 2 秒。如果超時(shí)仍未獲取到鎖,進(jìn)程將放棄嘗試并輸出提示。這樣可以防止進(jìn)程陷入長(zhǎng)時(shí)間等待,避免死鎖的發(fā)生。
4.3 信號(hào)量與條件變量
除了鎖機(jī)制,multiprocessing
模塊還提供了 Semaphore
和 Condition
對(duì)象,以實(shí)現(xiàn)更復(fù)雜的進(jìn)程同步。Semaphore
用于控制對(duì)共享資源的訪問(wèn)數(shù)量。例如,如果一個(gè)資源最多允許同時(shí)被兩個(gè)進(jìn)程訪問(wèn),就可以用一個(gè)計(jì)數(shù)為 2 的信號(hào)量來(lái)控制。
Condition
對(duì)象則用于實(shí)現(xiàn)復(fù)雜的條件同步,可以讓一個(gè)線程等待特定條件的滿足后再繼續(xù)執(zhí)行。以下是一個(gè)使用 Semaphore
的示例,演示如何限制同時(shí)訪問(wèn)共享資源的進(jìn)程數(shù):
from multiprocessing import Process, Semaphore
import timedef limited_access_task(semaphore, name):with semaphore: # 獲取信號(hào)量print(f'Process {name} is running')time.sleep(1)print(f'Process {name} has finished')if __name__ == "__main__":semaphore = Semaphore(2) # 最多允許兩個(gè)進(jìn)程同時(shí)訪問(wèn)processes = [Process(target=limited_access_task, args=(semaphore, f'P{i}')) for i in range(5)]for p in processes:p.start()for p in processes:p.join()
在這個(gè)例子中,同時(shí)只有兩個(gè)進(jìn)程能夠進(jìn)入 limited_access_task
中的臨界區(qū),確保了對(duì)共享資源的受控訪問(wèn)。
5. 進(jìn)程池
在實(shí)際應(yīng)用中,尤其是在需要處理大量任務(wù)的場(chǎng)景下,逐個(gè)創(chuàng)建和管理進(jìn)程是一個(gè)耗時(shí)且不夠高效的過(guò)程。multiprocessing
模塊通過(guò)提供 Pool
類,使開(kāi)發(fā)者能夠更加高效地管理多個(gè)并發(fā)任務(wù)。使用 Pool
,程序可以在后臺(tái)自動(dòng)管理進(jìn)程的創(chuàng)建、銷毀和任務(wù)分發(fā),讓開(kāi)發(fā)者更專注于業(yè)務(wù)邏輯的實(shí)現(xiàn)。
Pool
類的核心思想是創(chuàng)建一個(gè)預(yù)定義數(shù)量的工作進(jìn)程池,所有的任務(wù)都在這些進(jìn)程中并行執(zhí)行。當(dāng)任務(wù)數(shù)量超過(guò)可用進(jìn)程數(shù)時(shí),Pool
會(huì)將多余的任務(wù)排隊(duì),待有空閑進(jìn)程時(shí)再繼續(xù)執(zhí)行。這樣,開(kāi)發(fā)者無(wú)需手動(dòng)跟蹤每個(gè)進(jìn)程的狀態(tài),從而簡(jiǎn)化了代碼編寫(xiě)和維護(hù)。
在 multiprocessing.Pool
中,最常用的方法之一是 map()
。map()
方法會(huì)將輸入的可迭代對(duì)象中的每個(gè)元素傳遞給目標(biāo)函數(shù),并在多個(gè)進(jìn)程中并行執(zhí)行。以下示例展示了如何使用 map()
方法來(lái)并行計(jì)算一組數(shù)的平方:
from multiprocessing import Pooldef square(x):return x * xif __name__ == "__main__":with Pool(4) as pool: # 創(chuàng)建包含4個(gè)進(jìn)程的進(jìn)程池results = pool.map(square, [1, 2, 3, 4, 5])print("Squared results:", results)
在這個(gè)示例中,Pool
會(huì)根據(jù)指定的進(jìn)程數(shù)(此處為 4)創(chuàng)建工作進(jìn)程,并將輸入數(shù)據(jù) [1, 2, 3, 4, 5]
中的每個(gè)元素分配給其中的一個(gè)進(jìn)程來(lái)進(jìn)行計(jì)算。map()
會(huì)在所有任務(wù)完成后將結(jié)果以列表形式返回給主進(jìn)程。
5.1 imap與starmap
除了 map()
方法,Pool
還提供了其他強(qiáng)大的方法來(lái)滿足不同的需求。例如,當(dāng)任務(wù)的分發(fā)和收集結(jié)果需要更精細(xì)的控制時(shí),imap()
方法會(huì)更加合適。與 map()
不同,imap()
會(huì)以迭代器的形式返回結(jié)果,這樣當(dāng)結(jié)果生成時(shí)主進(jìn)程可以立即消費(fèi),而無(wú)需等待所有任務(wù)結(jié)束。以下代碼演示了如何使用 imap()
:
from multiprocessing import Pool
import timedef slow_square(x):time.sleep(1) # 模擬耗時(shí)任務(wù)return x * xif __name__ == "__main__":with Pool(4) as pool:for result in pool.imap(slow_square, [1, 2, 3, 4, 5]):print("Result received:", result)
在這個(gè)示例中,slow_square()
函數(shù)模擬了一個(gè)較為耗時(shí)的計(jì)算任務(wù)。imap()
方法在任務(wù)完成時(shí)會(huì)逐個(gè)返回結(jié)果,因此在每個(gè)結(jié)果生成后,主進(jìn)程可以立即處理它。這樣可以提高程序的響應(yīng)速度,尤其在需要逐步處理結(jié)果的場(chǎng)景下非常實(shí)用。
starmap()
方法是 map()
的變體,它允許將多個(gè)參數(shù)傳遞給目標(biāo)函數(shù)。對(duì)于接受多個(gè)參數(shù)的函數(shù),starmap()
可以將輸入解包后分配給目標(biāo)函數(shù)。例如,有一個(gè)計(jì)算兩數(shù)相加的函數(shù),我們希望并行執(zhí)行多個(gè)二元計(jì)算任務(wù):
from multiprocessing import Pooldef add(a, b):return a + bif __name__ == "__main__":with Pool(4) as pool:results = pool.starmap(add, [(1, 2), (3, 4), (5, 6), (7, 8)])print("Addition results:", results)
starmap()
會(huì)將輸入的每個(gè)元組 (a, b)
解包,并分別傳遞給目標(biāo)函數(shù) add()
,從而實(shí)現(xiàn)對(duì)多個(gè)參數(shù)的并行處理。
5.2 apply與apply_async
如果需要在任務(wù)之間進(jìn)行更靈活的控制,apply()
和 apply_async()
方法也非常有用。apply()
會(huì)阻塞主進(jìn)程直到任務(wù)執(zhí)行完成,而 apply_async()
則會(huì)異步執(zhí)行任務(wù)并返回 AsyncResult
對(duì)象,該對(duì)象可以用于檢查任務(wù)的狀態(tài)或獲取結(jié)果:
from multiprocessing import Pool
import timedef delayed_square(x):time.sleep(2)return x * xif __name__ == "__main__":with Pool(2) as pool:result = pool.apply_async(delayed_square, (4,))print("Task submitted.")# 檢查任務(wù)是否完成while not result.ready():print("Waiting for result...")time.sleep(1)print("Result received:", result.get())
apply_async()
提交任務(wù)后會(huì)立即返回,而主進(jìn)程可以繼續(xù)執(zhí)行其他任務(wù)或通過(guò) ready()
方法檢查任務(wù)的完成狀態(tài)。通過(guò) get()
方法可以獲取任務(wù)結(jié)果并阻塞,直到結(jié)果可用為止。