今天讲讲我在实习中学到的一点 python 知识,核心内容是多进程,也即我们常说的并行计算。
map
首先提个问题,给出一个列表,对列表中的每个元素都平方,代码怎么写?
最简单直观的方法自然就是 for 循环。
alist = [1,2,3,4,5,6,7,8]
def power_value(num):
return num**2
result_list = []
for num in alist:
result_list.append(power_value(num))
print(result_list)
>>> [1, 4, 9, 16, 25, 36, 49, 64]
注:当然你也可以不定义函数直接循环,这里主要是为了方便开展下文。
学过列表推导式的同学可以写出更加简洁的方案:
result_list = [power_value(x) for x in alist]
print(result_list)
>>> [1, 4, 9, 16, 25, 36, 49, 64]
但其实,除了上面两种方法外,python 还内置了一个函数 map
,专门解决这种不断往同一个函数传不同参数的问题。它的语法也很简单:
map(function, iterable, ...)
第一个参数为函数名,第二个参数和之后的参数均为序列(列表、元组、range()等等)。第一个参数 function 以参数序列中的每一个元素调用 function 函数,返回包含每次 function 函数返回值的新列表。有了这个函数,上面的代码就可以改写为:
result_list = list(map(power_value,alist))
print(result_list)
>>> [1, 4, 9, 16, 25, 36, 49, 64]
这里为什么要加一个 list()
呢?因为 map 返回的是一个迭代器,一般情况下只能循环遍历读取,所以加个 list 把它转换为列表。
上面的函数只有一个参数,那假如它有两个参数呢?我们分为两种情况,一是这两个参数是成对出现的,比如说,两个列表相加,就是列表对应的元素相加。这种情况我们也可以直接应用 map。
alist = [1,2,3,4,5,6,7,8]
blist = [2,3,4,5,6,7,8,9]
def add_value(a,b):
return a + b
result_list = list(map(add_value,alist,blist))
print(result_list)
>>> [3, 5, 7, 9, 11, 13, 15, 17]
第二种情况,这两个参数并不对应,常见情况是需要固定一个参数,比如上面的加法函数,我需要固定 b=5
,这时候,可以借助 python 提供的另外一个函数 partial
,这是 python 自带包 functools
里的函数,它的作用就是固定某些参数,从而构造出一个新函数。其语法为:
partial(func, param, ……)
这里有个容易忽略的点,如果不指定参数位置的话,那么默认从第一个参数开始固定,所以使用它的时候最好指定参数。但是被指定的参数必须在函数的最后部分,比如 func01(a,b,c)
,我们可以固定 b 和 c,可以固定 c,但是绝对不能只固定 b!否则会报错。
from functools import partial
alist = [1,2,3,4,5,6,7,8]
def add_value(a,b):
return (a + b) * a
func = partial(add_value,b = 5)
result_list = list(map(func,alist))
print(result_list)
>>> [6, 14, 24, 36, 50, 66, 84, 104]
多进程
常被和进程一起提起的是线程,不过我也没搞懂这两的具体原理。总之明白两个区别就好:
- 进程之前数据不共享,线程之间数据共享
- 进程之间不会相互影响,一个进程挂掉另外一个进程照常工作。而一个线程挂掉则所有线程都会挂掉。
我在工作中常用到的是多进程,所以主要讲讲它。
python 自带了一个多进程库 multiprocessing
,利用上面学到的 map 知识,我们可以很容易实现并行运算,有了它,上面的代码可以改写如下:
import multiprocessing
from functools import partial
alist = [1,2,3,4,5,6,7,8]
def add_value(a,b):
return (a + b) * a
func = partial(add_value,b = 5)
if __name__ == '__main__':
num_processes = multiprocessing.cpu_count()-2 # 使用核心数
pool = multiprocessing.Pool(processes=num_processes) # 实例化进程池
func = partial(add_value,b = 5)
print(pool.map(func,alist))
这样就实现了一个最简单的多进程,可以发现,和 map 相比就多了一个实例化进程池的过程。需要注意的是,这里开启多进程,肯定是要比单进程速度慢的,因为系统在进程间分配任务也是需要时间的,所以我们也不能无脑开多进程,大型任务才可能需要。
另外,pool.map
返回的是一个列表,如果想像 map 那样返回迭代器的话,可以使用 pool.imap
,这两个函数返回都是有序的,如果有特殊需求想返回无须结果,可以使用 imap_unordered
最重要的一点,多进程必须在 if __name__ == '__main__'
下写!否则会报错,当然了,你在函数里使用多进程,然后在 if __name__ == '__main__'
下调用这个函数也是允许的。
这是 Windows 上多进程的实现问题。在 Windows 上,子进程会自动 import 启动它的这个文件,而在 import 的时候是会执行这些语句的。如果你这么写的话就会无限递归创建子进程报错。所以必须把创建子进程的部分用那个 if 判断保护起来, import 的时候 name 不是 main,就不会递归运行了。
进度条
在执行大型任务的时候,我一般是输出一个中间结果就 print 一条提示语句,这样可以让我知道程序仍然在正常运行,而且也能大致知道处理到哪了。不过,如果只是上面两点需求的话,其实可以使用进度条,这样更加方便直观。最常见的进度条库是 tqdm
,pip 安装好后就可以使用了,语法也很简单。
from tqdm import tqdm,trange
for i in trange(100):
time.sleep(0.1)
for i in tqdm(range(100)):
time.sleep(0.1)
不过问题在于,多进程下如何使用进度条?自然,可以为每个进程生成一个进度条,但是我想看总的任务进度,而不是展示 5、6 根进度,怎么做?针对这个需求,我借鉴 Lei Mao,重写了 run_imap_mp
函数,用法和 map 基本一样,其他参数注释里也有说明,这里就不细说了。
def run_imap_mp(func, argument_list, num_processes='', is_tqdm=True):
'''
多进程与进度条结合
param:
------
func:function
函数
argument_list:list
参数列表
num_processes:int
进程数,不填默认为总核心-3
is_tqdm:bool
是否展示进度条,默认展示
'''
result_list_tqdm = []
try:
import multiprocessing
if num_processes == '':
num_processes = multiprocessing.cpu_count()-3
pool = multiprocessing.Pool(processes=num_processes)
if is_tqdm:
from tqdm import tqdm
for result in tqdm(pool.imap(func=func, iterable=argument_list), total=len(argument_list)):
result_list_tqdm.append(result)
else:
for result in pool.imap(func=func, iterable=argument_list):
result_list_tqdm.append(result)
pool.close()
except:
result_list_tqdm = list(map(func,argument_list))
return result_list_tqdm
版权属于:作者名称
本文链接:https://www.sitstars.com/archives/108/
转载时须注明出处及本声明