multiprocessing进程池与回调函数

一、研究背景

​ 在利用python库multiprocessing进行多进程开发时,利用了进程池及apply_async异步非阻塞进行函数调用时,为了代码逻辑清晰以及代码块功能最小化,下一步的调用使用到了第一次apply_async的结果,将两个异步调用执行变成了同步执行,导致异步1min可以执行完成的程序变成了10min才执行完成,将优点变成了缺点。

​ 这里将对源代码进行分析,并进行修改。

原本代码类似如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
from multiprocessing import Pool

def getA(info):
info['test1'] = 'test1'
info['test2'] = 'test2'
return info

def getB(info):
a = {}
a['test'] = info['test1'] + '_' + info['test2']
print(a)
return a

if __name__ == '__main__':
try:
msg = 'success'
pool = Pool(4)
infoList = [{'a': 'a1', 'b': 'b1'}, {'a': 'a2', 'b': 'b2'}]
for info in infoList:
basicinfo = pool.apply_async(getA, (info, ))
baseinfo = basicinfo.get()
pool.apply_async(getB, (baseinfo, ))
pool.close()
pool.join()
print(msg)
except Exception as err:
print(err)

执行结果:

1
2
3
{'test': 'test1_test2'}
{'test': 'test1_test2'}
success

二、原因分析

​ apply_async属于异步非阻塞模式,原本的代码使用了两次pool.apply_async(),并且第二次的apply_async需要利用第一次apply_async的结果,这就表示本来应该是两次异步同一时间执行两个函数,现在第二个函数需要等待第一个函数执行完之后再执行,产生了等待时间,从而将程序执行时长扩大。以下通过上述getA和getB示例详细描述原因:

​ 上图为apply_async方法适合用的场景,该场景两个调用函数并不关心对方的结果,getA与getB函数可以同时执行,到了t2时刻执行时间最长的getA函数执行结束,则该程序执行结束。

​ 而本次的需求实际为上图所示,虽然都使用了apply_async的异步模式,但getB函数需接收getA执行完成的结果,导致getB函数并没有和getA同时开始执行,getB从t1时刻就开始等待getA的执行结果,直到t2时刻getA执行完成返回结果之后,getB才开始执行,直到t3时刻执行结束。

三、处理方法

1、合并调用函数

​ 该方法是将底层的getA和getB两个函数,合并为一个函数getC执行,之后使用一次apply_async执行即可。修改代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
from multiprocessing import Pool

def getA(info):
info['test1'] = 'test1'
info['test2'] = 'test2'
return info

def getB(info):
a = {}
a['test'] = info['test1'] + '_' + info['test2']
return a

# 可以添加一个合并了getA和getB函数的getC
def getC(info):
info_a = getA(info)
info_b = getB(info_a)
print(info_b)
return info_b

if __name__ == '__main__':
try:
msg = 'success'
pool = Pool(4)
infoList = [{'a': 'a1', 'b': 'b1'}, {'a': 'a2', 'b': 'b2'}]
for info in infoList:
basicinfo = pool.apply_async(getC, (info, )) # 直接调用getC
pool.close()
pool.join()
print(msg)
except Exception as err:
print(err)

执行结果:

1
2
3
{'test': 'test1_test2'}
{'test': 'test1_test2'}
success

2、使用回调函数

​ 另外一种方式是使用apply_async所包含的回调函数,回调函数的格式为apply_async(func1, args=(input, ), callback=func2),意思就是将func1函数的返回传递给func2,并且由主进程继续执行func2函数,修改代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
from multiprocessing import Pool
import os

def getA(info):
info['test1'] = 'test1'
info['test2'] = 'test2'
print('getA pid', os.getpid())
return info

def getB(info):
a = {}
a['test'] = info['test1'] + '_' + info['test2']
print(a)
print('getB pid', os.getpid())
return a

if __name__ == '__main__':
try:
print("主进程pid:", os.getpid())
msg = 'success'
pool = Pool(4)
infoList = [{'a': 'a1', 'b': 'b1'}, {'a': 'a2', 'b': 'b2'}]
for info in infoList:
basicinfo = pool.apply_async(getA, args=(info, ), callback=getB) # 将getB看为回调函数
pool.close()
pool.join()
print(msg)
except Exception as err:
print(err)

执行结果:

1
2
3
4
5
6
7
8
9
10
主进程pid: 20006
getA pid 20007
getA pid 20008
{'test': 'test1_test2'}
getB pid 20006
{'test': 'test1_test2'}
getB pid 20006
success

# 可以看到所有的getB函数都由主进程来完成