不使用多进程
我们以一个不使用多进程的例子作为基准,来展开后面的内容。
import time
import multiprocessing
def do_something():
print('Sleeping 1 second(s)...')
time.sleep(1)
print('Done sleeping...')
def no_multiprocessing():
for _ in range(10):
do_something()
if __name__ == '__main__':
start=time.perf_counter()
no_multiprocessing()
end=time.perf_counter()
print(f'Finished in {round(end-start,2)} second(s)')
我们让do_something这个函数循环十次,看输出结果以及时间。
Sleeping 1 second(s)...
Done sleeping...
Sleeping 1 second(s)...
Done sleeping...
Sleeping 1 second(s)...
Done sleeping...
Sleeping 1 second(s)...
Done sleeping...
Sleeping 1 second(s)...
Done sleeping...
Sleeping 1 second(s)...
Done sleeping...
Sleeping 1 second(s)...
Done sleeping...
Sleeping 1 second(s)...
Done sleeping...
Sleeping 1 second(s)...
Done sleeping...
Sleeping 1 second(s)...
Done sleeping...
Finished in 10.11 second(s)
Process finished with exit code 0
可以看到程序是按照顺序执行,循环十次总共用时10.11秒。
下面我们加上基本的多进程。
使用multiprocessing模块
添加一个使用multiprocessing的函数,首先先添加两个进程,我们看一下输出。
def with_multiprocessing():
# target参数传入的函数不需要添加后面的括号,我们不是想执行这个函数,
# 只是将这个函数的地址传递给target就可以
p1=multiprocessing.Process(target=do_something)
p2=multiprocessing.Process(target=do_something)
p1.start()
p2.start()
Finished in 0.03 second(s)
Sleeping 1 second(s)...
Sleeping 1 second(s)...
Done sleeping...
Done sleeping...
可以看到输出的时间比较奇怪,用时竟然还不到do_something中的sleep时间,这是因为上面的写法,当程序执行到with_multiprocessing()函数后,就立即向下执行到end=time.perf_counter()函数了,所以这个时间就相当于创建这两个进程所用的时间。如果想将进程本体的执行时间计算其中,需要添加join()函数,修改后的代码如下:
def with_multiprocessing():
p1 = multiprocessing.Process(target=do_something)
p2 = multiprocessing.Process(target=do_something)
p1.start()
p1.join()
这时候的输出结果是
Sleeping 1 second(s)...
Sleeping 1 second(s)...
Done sleeping...
Done sleeping...
Finished in 2.05 second(s)
一个进程的执行时间2s多,比串行长了1s,进程的创建执行也会消耗一些时间。
多加几个进行看一下时间,我的电脑有4个core,将进程添加到4个,看一下时间。
将with_multiprocessing()函数修改为:
def with_multiprocessing():
# p1 = multiprocessing.Process(target=do_something)
# p2 = multiprocessing.Process(target=do_something)
#
# p1.start()
# p2.start()
#
# p1.join()
# p2.join()
processes_list = []
for _ in range(4):
p = multiprocessing.Process(target=do_something)
p.start()
processes_list.append(p)
for process in processes_list:
process.join()
注意其中的join()要单独一个for循环来处理,不然如果start()后接着join(),那么就会进入进程阻塞状态,需要等到一个进程执行完成之后,再执行下一个进程,这样和串行就没有什么区别了。
输出结果如下:
Sleeping 1 second(s)...
Sleeping 1 second(s)...
Sleeping 1 second(s)...
Sleeping 1 second(s)...
Done sleeping...
Done sleeping...
Done sleeping...
Done sleeping...
Finished in 2.86 second(s)
可以看到时间为2s多,所以是并行执行没问题了。
使用concurrent.futures实现并行
multiprocessing模块虽然可以实现并行,但是整体的编程有点繁琐,我们使用python3.2之后引入的concurrent模块来实现一下多进程。现在multiprocessing模块就不需要了,可以将import multiprocessing删除。
在一开始,导入concurrent模块
import concurrent.futures
如果是启动单个进程,代码可以这样编写:
def with_concurrent():
with concurrent.futures.ProcessPoolExecutor() as executer:
result=executer.submit(do_something)
print(result.result())
这时候我们看输出
Sleeping 1 second...
Done sleeping...
None
Finished in 2.26 second(s)
其中有个None的返回值,这是因为concurrent的传入的函数最后需要是return,个人猜测可能是因为concurrent将创建的进程放进了一个队列中,这样print的返回值就是None。可以看下面这个例子,差不多是同样的道理。
>>> [print(i) for i in range(4)]
可以停止往下看,自己思考一下这个的输出应该是什么。
结果是这样的:
0
1
2
3
[None, None, None, None]
前面输出的数字没有疑问,最后的[None, None, None, None]就是print的返回值,print默认返回None。
所以我们将do_something的print改成return,同时我们对do_something做另一些修改,让他能够接受sleep多少时间的参数,最后的函数如下:
def do_something_for_concurrent(seconds=1):
print(f'Sleeping {seconds} second(s)...')
time.sleep(seconds)
return f'Done sleeping {seconds} second(s)...'
我们在with_concurrent函数里面创建多个进程,
def with_concurrent():
with concurrent.futures.ProcessPoolExecutor() as executer:
results=[executer.submit(do_something_for_concurrent,sec) for sec in range(1,10)]
for f in concurrent.futures.as_completed(results):
print(f.result())
# result=executer.submit(do_something)
# print(result.result())
在这里,多个进程一起创建,我们使用列表生成式,使用
for f in concurrent.futures.as_completed(results):
print(f.result())
将进程的数据取出来。
输出为:
Sleeping 1 second(s)...
Sleeping 2 second(s)...
Sleeping 3 second(s)...
Sleeping 4 second(s)...
Sleeping 5 second(s)...
Done sleeping 1 second(s)...
Sleeping 6 second(s)...
Done sleeping 2 second(s)...
Sleeping 7 second(s)...
Done sleeping 3 second(s)...
Done sleeping 4 second(s)...
Sleeping 8 second(s)...
Sleeping 9 second(s)...
Done sleeping 5 second(s)...
Done sleeping 6 second(s)...
Done sleeping 7 second(s)...
Done sleeping 8 second(s)...
Done sleeping 9 second(s)...
Finished in 16.18 second(s)
Process finished with exit code 0
这样写起来还是有点繁琐,对于有list,同时是对一个对象进行的操作的时候,我们很容易想到使用map高阶函数,比如
>>> list(map(lambda x:x**2,[1,2,3]))
[1, 4, 9]
在concurrent里面,同样提供了一个这样的map函数,我们将with_concurrent函数的内容改为下面这样:
def with_concurrent():
with concurrent.futures.ProcessPoolExecutor() as executer:
# results=[executer.submit(do_something_for_concurrent,sec) for sec in range(1,10)]
# for f in concurrent.futures.as_completed(results):
# print(f.result())
# result=executer.submit(do_something)
# print(result.result())
secs=[i for i in range(1,10)]
results=executer.map(do_something_for_concurrent,secs)
for result in results:
print(result)
再看一下执行结果
Sleeping 1 second(s)...
Sleeping 2 second(s)...
Sleeping 3 second(s)...
Sleeping 4 second(s)...
Sleeping 5 second(s)...
Done sleeping 1 second(s)...
Sleeping 6 second(s)...
Done sleeping 2 second(s)...
Sleeping 7 second(s)...
Done sleeping 3 second(s)...
Sleeping 8 second(s)...
Done sleeping 4 second(s)...
Sleeping 9 second(s)...
Done sleeping 5 second(s)...
Done sleeping 6 second(s)...
Done sleeping 7 second(s)...
Done sleeping 8 second(s)...
Done sleeping 9 second(s)...
Finished in 16.82 second(s)
Process finished with exit code 0
贴上最终的代码文件
import time
import multiprocessing
import concurrent.futures
def do_something_for_concurrent(seconds=1):
print(f'Sleeping {seconds} second(s)...')
time.sleep(seconds)
return f'Done sleeping {seconds} second(s)...'
def do_something():
print('Sleeping 1 second...')
time.sleep(1)
print('Done sleeping...')
def no_multiprocessing():
for _ in range(8):
do_something()
def with_concurrent():
with concurrent.futures.ProcessPoolExecutor() as executer:
# results=[executer.submit(do_something_for_concurrent,sec) for sec in range(1,10)]
# for f in concurrent.futures.as_completed(results):
# print(f.result())
# result=executer.submit(do_something)
# print(result.result())
secs=[i for i in range(1,10)]
results=executer.map(do_something_for_concurrent,secs)
for result in results:
print(result)
def with_multiprocessing():
# p1 = multiprocessing.Process(target=do_something)
# p2 = multiprocessing.Process(target=do_something)
#
# p1.start()
# p2.start()
#
# p1.join()
# p2.join()
processes_list = []
for _ in range(16):
p = multiprocessing.Process(target=do_something)
p.start()
processes_list.append(p)
for process in processes_list:
process.join()
if __name__ == '__main__':
start = time.perf_counter()
# no_multiprocessing()
# with_multiprocessing()
with_concurrent()
end = time.perf_counter()
print(f'Finished in {round(end - start, 2)} second(s)')
评论(0)
您还未登录,请登录后发表或查看评论