Python多线程并行计算

本文主要介绍了在Python中实现并行计算的办法。

问题场景

最简单的场景:
同一个函数,需要在不同的输入下分别运行,此时即可使用并行计算方法。

参考文献
Python 多进程 multiprocessing.Pool类详解
Python 多核并行计算

模块与类

此部分介绍了两种多线程的方法,分别为multiprocessing.Process()multiprocessing.Pool()
他们之间的区别为:

  • 任务与进程的对应关系,Process()一个任务对应一个进程,但Pool()并没有准确的对应关系,每来一个任务,如果进程池没满,就会创建一个进程执行;如果进程池满了,则会等待池中进程结束再来执行当前任务。
    • 由此导致在进程数量管理上,Pool()Process()更方便,不需要手动为任务分配进程。

multiprocessing.Process()

构造方法

1
__init__(self, group=None, target=None, name=None, args=(), kwargs={})

目前需要注意的是,target表示调用对象,如函数名,args是函数的输入参数。

借用参考文献中的例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
#coding=utf-8
import multiprocessing

def do(n) :
#获取当前线程的名字
name = multiprocessing.current_process().name
print name,'starting'
print "worker ", n
return

if __name__ == '__main__' :
numList = []
for i in xrange(5) :
p = multiprocessing.Process(target=do, args=(i,))
numList.append(p)
p.start()
p.join()
print "Process end."

p.start(),启动进程,p.join(),等待当前进程结束之后再继续,常用于进程间的同步。

multiprocessing.Pool()

流程:

1
2
3
4
pool = Pool(5)
rl =pool.map(func, Para_inter) # 可以设置左值,对于函数没有返回值的情况
pool.close()
pool.join()

例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
import time
from multiprocessing import Pool
def run(fn):
#fn: 函数参数是数据列表的一个元素
time.sleep(1)
return fn*fn

if __name__ == "__main__":
testFL = [1,2,3,4,5,6]
print 'shunxu:' #顺序执行(也就是串行执行,单进程)
s = time.time()
for fn in testFL:
run(fn)

e1 = time.time()
print "顺序执行时间:", int(e1 - s)

print 'concurrent:' #创建多个进程,并行执行
pool = Pool(5) #创建拥有5个进程数量的进程池
#testFL:要处理的数据列表,run:处理testFL列表中数据的函数
rl =pool.map(run, testFL)
pool.close()#关闭进程池,不再接受新的进程
pool.join()#主进程阻塞等待子进程的退出
e2 = time.time()
print "并行执行时间:", int(e2-e1)
print rl

1
map(func, iterable[, chunksize=None])

func为函数名,后面的迭代器中为函数参数。map的赋值对象表示返回值的列表,等所有进程执行完毕之后才回返回。顺序不一定与输入顺序一致。

1
close()

关闭进程池,不接受新的任务,已经接受的任务会继续进行

1
terminate()

结束所有的进程

1
join()

主进程阻塞等待子进程的退出,然后再执行后面的语句,join方法必须在close或terminate之后使用。

如果函数需要传递多个参数,则需要分别构建每个参数的列表,zip,并且重新定义函数为单一变量函数,将所有变量作为一个tuple传入:

1
2
3
4
def f_zipped(para):
f(para[0]...)

pool.map(f_zipped,zip(paralist1,paralist2...))

注意事项

Pool和Process都是多进程,擅长于处理计算任务,如果IO任务多,则需要使用多线程。

1
2
from multiprocessing.dummy import Pool as ThreadPool
pool=ThreadPool(5)

其他类似。需要注意的有2点:

  • 不可直接通过multiprocessing.dummy.Pool来创建对象,必须通过import的方式才行
  • 所有的线程都在一个进程之内,仅适合多IO任务,不适合多计算任务

同时,opencv和Image类不适合多进程,进程开多了容易报错,目前暂无解决的办法。