不使用多进程

我们以一个不使用多进程的例子作为基准,来展开后面的内容。

import time
import multiprocessing

def do_something():
    print('Sleeping 1 second(s)...')
    time.sleep(1)
    print('Done sleeping...')

def no_multiprocessing():
    for _ in range(10):
        do_something()

if __name__ == '__main__':
    start=time.perf_counter()
    no_multiprocessing()
    end=time.perf_counter()
    print(f'Finished in {round(end-start,2)} second(s)')

我们让do_something这个函数循环十次,看输出结果以及时间。

Sleeping 1 second(s)...
Done sleeping...
Sleeping 1 second(s)...
Done sleeping...
Sleeping 1 second(s)...
Done sleeping...
Sleeping 1 second(s)...
Done sleeping...
Sleeping 1 second(s)...
Done sleeping...
Sleeping 1 second(s)...
Done sleeping...
Sleeping 1 second(s)...
Done sleeping...
Sleeping 1 second(s)...
Done sleeping...
Sleeping 1 second(s)...
Done sleeping...
Sleeping 1 second(s)...
Done sleeping...
Finished in 10.11 second(s)

Process finished with exit code 0

可以看到程序是按照顺序执行,循环十次总共用时10.11秒。
下面我们加上基本的多进程。

使用multiprocessing模块

添加一个使用multiprocessing的函数,首先先添加两个进程,我们看一下输出。

def with_multiprocessing():
    # target参数传入的函数不需要添加后面的括号,我们不是想执行这个函数,
    # 只是将这个函数的地址传递给target就可以
    p1=multiprocessing.Process(target=do_something)
    p2=multiprocessing.Process(target=do_something)

    p1.start()
    p2.start()
Finished in 0.03 second(s)
Sleeping 1 second(s)...
Sleeping 1 second(s)...
Done sleeping...
Done sleeping...

可以看到输出的时间比较奇怪,用时竟然还不到do_something中的sleep时间,这是因为上面的写法,当程序执行到with_multiprocessing()函数后,就立即向下执行到end=time.perf_counter()函数了,所以这个时间就相当于创建这两个进程所用的时间。如果想将进程本体的执行时间计算其中,需要添加join()函数,修改后的代码如下:

def with_multiprocessing():
    p1 = multiprocessing.Process(target=do_something)
    p2 = multiprocessing.Process(target=do_something)
    p1.start()
    p1.join()

这时候的输出结果是

Sleeping 1 second(s)...
Sleeping 1 second(s)...
Done sleeping...
Done sleeping...
Finished in 2.05 second(s)

一个进程的执行时间2s多,比串行长了1s,进程的创建执行也会消耗一些时间。
多加几个进行看一下时间,我的电脑有4个core,将进程添加到4个,看一下时间。

将with_multiprocessing()函数修改为:

def with_multiprocessing():
    # p1 = multiprocessing.Process(target=do_something)
    # p2 = multiprocessing.Process(target=do_something)
    #
    # p1.start()
    # p2.start()
    #
    # p1.join()
    # p2.join()

    processes_list = []
    for _ in range(4):
        p = multiprocessing.Process(target=do_something)
        p.start()
        processes_list.append(p)

    for process in processes_list:
        process.join()

注意其中的join()要单独一个for循环来处理,不然如果start()后接着join(),那么就会进入进程阻塞状态,需要等到一个进程执行完成之后,再执行下一个进程,这样和串行就没有什么区别了。
输出结果如下:

Sleeping 1 second(s)...
Sleeping 1 second(s)...
Sleeping 1 second(s)...
Sleeping 1 second(s)...
Done sleeping...
Done sleeping...
Done sleeping...
Done sleeping...
Finished in 2.86 second(s)

可以看到时间为2s多,所以是并行执行没问题了。

使用concurrent.futures实现并行

multiprocessing模块虽然可以实现并行,但是整体的编程有点繁琐,我们使用python3.2之后引入的concurrent模块来实现一下多进程。现在multiprocessing模块就不需要了,可以将import multiprocessing删除。

在一开始,导入concurrent模块

import concurrent.futures

如果是启动单个进程,代码可以这样编写:

def with_concurrent():
    with concurrent.futures.ProcessPoolExecutor() as executer:
        result=executer.submit(do_something)
        print(result.result())

这时候我们看输出

Sleeping 1 second...
Done sleeping...
None
Finished in 2.26 second(s)

其中有个None的返回值,这是因为concurrent的传入的函数最后需要是return,个人猜测可能是因为concurrent将创建的进程放进了一个队列中,这样print的返回值就是None。可以看下面这个例子,差不多是同样的道理。

>>> [print(i) for i in range(4)]

可以停止往下看,自己思考一下这个的输出应该是什么。
结果是这样的:

0
1
2
3
[None, None, None, None]

前面输出的数字没有疑问,最后的[None, None, None, None]就是print的返回值,print默认返回None。

所以我们将do_something的print改成return,同时我们对do_something做另一些修改,让他能够接受sleep多少时间的参数,最后的函数如下:

def do_something_for_concurrent(seconds=1):
    print(f'Sleeping {seconds} second(s)...')
    time.sleep(seconds)
    return f'Done sleeping {seconds} second(s)...'

我们在with_concurrent函数里面创建多个进程,

def with_concurrent():
    with concurrent.futures.ProcessPoolExecutor() as executer:

        results=[executer.submit(do_something_for_concurrent,sec) for sec in range(1,10)]
        for f in concurrent.futures.as_completed(results):
            print(f.result())
        # result=executer.submit(do_something)
        # print(result.result())

在这里,多个进程一起创建,我们使用列表生成式,使用

 for f in concurrent.futures.as_completed(results):
     print(f.result())

将进程的数据取出来。

输出为:

Sleeping 1 second(s)...
Sleeping 2 second(s)...
Sleeping 3 second(s)...
Sleeping 4 second(s)...
Sleeping 5 second(s)...
Done sleeping 1 second(s)...
Sleeping 6 second(s)...
Done sleeping 2 second(s)...
Sleeping 7 second(s)...
Done sleeping 3 second(s)...
Done sleeping 4 second(s)...
Sleeping 8 second(s)...
Sleeping 9 second(s)...
Done sleeping 5 second(s)...
Done sleeping 6 second(s)...
Done sleeping 7 second(s)...
Done sleeping 8 second(s)...
Done sleeping 9 second(s)...
Finished in 16.18 second(s)

Process finished with exit code 0

这样写起来还是有点繁琐,对于有list,同时是对一个对象进行的操作的时候,我们很容易想到使用map高阶函数,比如

>>> list(map(lambda x:x**2,[1,2,3]))
[1, 4, 9]

在concurrent里面,同样提供了一个这样的map函数,我们将with_concurrent函数的内容改为下面这样:

def with_concurrent():
    with concurrent.futures.ProcessPoolExecutor() as executer:

        # results=[executer.submit(do_something_for_concurrent,sec) for sec in range(1,10)]
        # for f in concurrent.futures.as_completed(results):
        #     print(f.result())
        # result=executer.submit(do_something)
        # print(result.result())

        secs=[i for i in range(1,10)]
        results=executer.map(do_something_for_concurrent,secs)
        for result in results:
            print(result)

再看一下执行结果

 Sleeping 1 second(s)...
Sleeping 2 second(s)...
Sleeping 3 second(s)...
Sleeping 4 second(s)...
Sleeping 5 second(s)...
Done sleeping 1 second(s)...
Sleeping 6 second(s)...
Done sleeping 2 second(s)...
Sleeping 7 second(s)...
Done sleeping 3 second(s)...
Sleeping 8 second(s)...
Done sleeping 4 second(s)...
Sleeping 9 second(s)...
Done sleeping 5 second(s)...
Done sleeping 6 second(s)...
Done sleeping 7 second(s)...
Done sleeping 8 second(s)...
Done sleeping 9 second(s)...
Finished in 16.82 second(s)

Process finished with exit code 0

贴上最终的代码文件

import time
import multiprocessing
import concurrent.futures

def do_something_for_concurrent(seconds=1):
    print(f'Sleeping {seconds} second(s)...')
    time.sleep(seconds)
    return f'Done sleeping {seconds} second(s)...'

def do_something():
    print('Sleeping 1 second...')
    time.sleep(1)
    print('Done sleeping...')

def no_multiprocessing():
    for _ in range(8):
        do_something()

def with_concurrent():
    with concurrent.futures.ProcessPoolExecutor() as executer:

        # results=[executer.submit(do_something_for_concurrent,sec) for sec in range(1,10)]
        # for f in concurrent.futures.as_completed(results):
        #     print(f.result())
        # result=executer.submit(do_something)
        # print(result.result())

        secs=[i for i in range(1,10)]
        results=executer.map(do_something_for_concurrent,secs)
        for result in results:
            print(result)


def with_multiprocessing():
    # p1 = multiprocessing.Process(target=do_something)
    # p2 = multiprocessing.Process(target=do_something)
    #
    # p1.start()
    # p2.start()
    #
    # p1.join()
    # p2.join()

    processes_list = []
    for _ in range(16):
        p = multiprocessing.Process(target=do_something)
        p.start()
        processes_list.append(p)

    for process in processes_list:
        process.join()


if __name__ == '__main__':
    start = time.perf_counter()
    # no_multiprocessing()
    # with_multiprocessing()
    with_concurrent()
    end = time.perf_counter()
    print(f'Finished in {round(end - start, 2)} second(s)')