- Published on
Python 避免进程池空闲,实现单个任务(non-batch)立即出列入列
- Authors
- Name
- Wang Zhiwei
用途:
初始化一个进程池,当进程池没有满且有剩余等待执行的任务时,自动往池里新增一个任务,即每完成一个任务立即新增一个任务,而不是等待一批任务全部完成再新增一批任务,达到高效利用进程池的目的。
一个简单的例子
import os
import random
import time
from concurrent.futures import FIRST_COMPLETED, wait
from concurrent.futures import ProcessPoolExecutor
from multiprocessing import Queue
def runner(N):
sleeptime = random.randint(1, 10)
time.sleep(sleeptime)
print(f"{os.getpid()}\ttask {N} finished")
tasks = list(range(10))
workers = 3
wait_time = 1
if __name__ == "__main__":
with ProcessPoolExecutor(max_workers=workers) as executor:
rq = dict() # non-process-safe queue
tq = Queue(maxsize=len(tasks))
for task in tasks:
tq.put_nowait(task)
# main process polling and dispatching tasks
while not tq.empty():
if len(rq) < workers:
N = tq.get()
future = executor.submit(runner, N=N)
rq[future] = N
print(f"task {N} submitted")
else:
done, not_done = wait(rq, timeout=wait_time, return_when=FIRST_COMPLETED)
if not done:
print(f"Waiting for tasks to finish")
continue
for future in done:
del rq[future]
print(f"task {future} removed from rq")
print("All tasks done")
执行用例
task 0 submitted
task 1 submitted
task 2 submitted
Waiting for tasks to finish
Waiting for tasks to finish
74421 task 0 finished
task <Future at 0x102d478b0 state=finished returned NoneType> removed from rq
task 3 submitted
Waiting for tasks to finish
74422 task 1 finished
74423 task 2 finished
task <Future at 0x102d598e0 state=finished returned NoneType> removed from rq
task 4 submitted
task <Future at 0x102d599d0 state=finished returned NoneType> removed from rq
task 5 submitted
Waiting for tasks to finish
74421 task 3 finished
task <Future at 0x102d59e20 state=finished returned NoneType> removed from rq
task 6 submitted
74422 task 4 finished
task <Future at 0x102d478b0 state=finished returned NoneType> removed from rq
task 7 submitted
Waiting for tasks to finish
74423 task 5 finished
task <Future at 0x102d59c70 state=finished returned NoneType> removed from rq
task 8 submitted
Waiting for tasks to finish
Waiting for tasks to finish
74422 task 7 finished
task <Future at 0x102d59880 state=finished returned NoneType> removed from rq
task 9 submitted
74423 task 8 finished
74421 task 6 finished
74422 task 9 finished
All tasks done