本站消息

站长简介


前每日优鲜python全栈开发工程师,自媒体达人,逗比程序猿,钱少话少特宅,我的公众号:想吃麻辣香锅

  python大神匠心打造,零基础python开发工程师视频教程全套,基础+进阶+项目实战,包含课件和源码

  出租广告位,需要合作请联系站长



+关注
已关注

分类  

暂无分类

标签  

暂无标签

日期归档  

2020-06(12)

2020-07(30)

2020-08(30)

2020-09(66)

2020-10(79)

Python学习23--多线程和多进程

发布于2021-04-25 21:12     阅读(228)     评论(0)     点赞(11)     收藏(3)


0

1

2

3

4

5

6



一、基本概念

1. 异步和同步
关注的是消息通信机制,行为方式,描述的是多个(线程、进程)相互之间的关系。

同步:调用者主动等待被调用者返回结果,在没有返回结果之前就一直专职等待。
比如:打电话,拨号之后,什么都不干,专职等待,拨号的动作和其他的动作之间“同步关系”

异步:调用者发送请求之后,不会专职等待被调用者返回结果,而是当被调用者有结果之后,
通知调用者,然后调用者再回头继续做刚才的任务。
比如:打电话,拨号了之后,我放免提,继续编代码,什么时候接通了,什么时候开始说话。

2. 阻塞和非阻塞
关注的是程序在等待调用结果的时候的状态。

阻塞:在调用结果返回之前,当前的线程会被挂起,调用的线程只有在得到结果之后才可以继续向下执行
比如:打电话,拨号之后,一直等,等到拨号接通之后…(拨号动作做是个阻塞函数,等阻塞状态)

非阻塞:在调用返回结果之前,当前的线程不会被挂起,而是继续执行其他的动作。
比如:打电话,拨号之后,不等待,继续其他的事情(拨号的动作就非阻塞函数,当前的线程处于非阻塞状态)

同步异步和阻塞与非阻塞:

阻塞和非阻塞与同步和异步之间没有必然的联系,是两个概念,从两个角度解释的任务

阻塞是同步机制的结果。非阻塞是使用异步机制的结果。

非阻塞不是一定意味着异步。 非阻塞只能意味着,方法调用后,并不阻塞当前的任务,但是到底要不要
继续调用其他的线程,那不是非阻塞决定的,是异步决定。

阻塞一定意味着任务之间一定不是异步的关系。

异步不只意味着方法是非阻塞,同时意味着工作会转移到其他的线程。

同步一定意味着,在某一个时刻,某一个任务是处于阻塞状态。

3. 串行 和并行
串行:多个任务,按照顺序执行。从概念上,可以理解成多个任务同步执行。
多个任务串行执行,任务之间一定是同步的关系

并行:多个任务同时执行,宏观上和微观上,分为并行和并发。

4.并行和并发
都是同时处理的含义,并行才是真正的同时处理。

区分并行和并发:
时间上:并行多个事件在同一个时刻同时发生
并发多个事件在同一个时间间隔发生。其实切片时间片轮流执行。

作用点:
并行作用在多个实体上的多个事件。
并发作用在同一个实体上的多个事件。

处理个数:
并行是多个处理器处理多个任务
并发是一个处理器处理多个任务

举例:
发试卷:
串行:1个人,给每个人,挨个发时间
并行:班长1 班长2 两个人同时发试卷。
并发:对于班长1 既要发试卷又关门。

5. 并发的三个层次
低阶:对于底层的操作系统实现并发,高端
中阶:高级语言大多都支持中阶并发。编程语言上实现的并发。创建多线程,多进程
高阶:对中阶的实现进行了封装。第三方模块开发好的框架。

二、多进程

创建进程的方式:
1、mutiprocessing.Process 指定target args,创建进程
2、继承mutiprocessing.Process 重写run方法
3、 fork()函数 ,只能在linux下使用。
4.、其他创建方式

做一个累加求和的方法,分别模拟计算密集型和IO密集型

第一次:串行执行:

计算密集型时间:1.26s
IO密集型计算时间:15.01s

import time
def sum(a,b):
    s = 0
    for i in range(a,b):
        s+=i
        time.sleep(0.5)
    return s

def sum1(a,b):
    return sum(0,a),sum(0,b)
#串行执行

if __name__ == "__main__":
    start_time = time.time()
    #计算密集型,sum函数中不执行sleep函数
    # sum1(10000000,20000000)
    #IO密集型,sum函数中执行sleep函数
    sum1(10,20)
    end_time = time.time()
    process_time = end_time-start_time
    print(process_time)

第二次:多进程执行
计算密集型时间:0.949s
IO密集型计算时间:10.1s

import time,multiprocessing
def sum(a,b):
    s = 0
    for i in range(a,b):
        s+=i
        # time.sleep(0.5)
    return s

#多进程执行
def sum2(a1,b1):
    p1 = multiprocessing.Process(target=sum,args=(0,a1))
    p2 = multiprocessing.Process(target=sum,args=(0,b1))
    p1.start()
    p2.start()
    p1.join()
    p2.join()

if __name__ == "__main__":
    start_time = time.time()
    #计算密集型,sum函数中不执行sleep函数
    sum2(10000000,20000000)
    #IO密集型,sum函数中执行sleep函数
    # sum2(10,20)
    end_time = time.time()
    process_time = end_time-start_time
    print(process_time)

其他方式创建进程
【进程池】:负责维护一定数量进程,可以向进程池提交任务,如果进程池中的进程数量
没有达到上限,则会创建一个新的进程,执行任务。否则会在进程池外等待,等待进程池内进程
死亡或者结束,才能再次创建新的进程,接受任务。

使用进程池创建进程multiprocessing.Pool
同步申请 进程池.apply方法,向进程池中申请进程。
创建执行完一个进程结束之后,再创建执行另外一个进程(排队)
无论是子进程之间还是主进程和子进程之间,都是串行的。
apply(函数名,函数参数(元组))
如下:

from multiprocessing import Pool
import time
def work(i):
    print("{}子进程开始".format(i))
    time.sleep(0.1)
    print("{}子进程结束".format(i))

if __name__ == "__main__":
    #Pool(maxsize)maxsize进程池中进程数量的上限
    p=Pool(3)
    for i in range(3):
        p.apply(work,args=(i,))
    print("主进程运行结束")

运行结果为:

0子进程开始
0子进程结束
1子进程开始
1子进程结束
2子进程开始
2子进程结束
主进程运行结束

第三次:使用multiprocessing下的Pool实现
计算密集型:1.40s
IO密集型:15.12s
代码如下:

from multiprocessing import Pool
import time
def sum(a,b):
    s = 0
    for i in range(a,b):
        s+=i
        #模拟IO密集型
        # time.sleep(0.5)
    return s
#利用multiprocessing模块下的Pool方法分别模拟
#IO密集型:15.12
#计算密集型:1.40
def sum3(a,b):
    p = Pool(3)
    p.apply(sum,args=(0,a))
    p.apply(sum,args=(0,b))

if __name__ == "__main__":
    #Pool(maxsize)maxsize进程池中进程数量的上限
    start_time = time.time()
    sum3(10000000,20000000)
    end_time =time.time()
    process_time = end_time-start_time
    print(process_time)

使用进程池创建进程multiprocessing.Pool下的异步申请apply_async()
程池实现的异步申请,子进程类似于后台进程,主进程执行完毕,不会再执行子进程了。
join方法 :能够子进程抢占时间片,强迫阻塞主进程
使用join之前必须使用close或者terminate,保证进程池中的进程,不再接受新的进程申请了。
close:暂时关闭进程池,不接受新任务
terminate: 真正关闭进程池

主进程执行完毕,不会执行子进程,如下:

from multiprocessing import Pool
import time
def work(i):
    print("{}子进程运行".format(i))
    time.sleep(5)

if __name__ == "__main__":
    start_time = time.time()
    p = Pool(3)
    for i in range(3):
        p.apply_async(work,args=(i,))
    end_time = time.time()
    process_time = end_time-start_time
    print("主进程结束,运行时间为{}".format(process_time))

输出:

主进程结束,运行时间为0.03690290451049805

使用close和join方法来抢占主进程的时间片

from multiprocessing import Pool
import time
def work(i):
    print("{}子进程运行".format(i))
    time.sleep(5)

if __name__ == "__main__":
    start_time = time.time()
    p = Pool(3)
    for i in range(3):
        p.apply_async(work,args=(i,))
    p.close()
    p.join()
    end_time = time.time()
    process_time = end_time-start_time
    print("主进程结束,运行时间为{}".format(process_time))

输出:

0子进程运行
1子进程运行
2子进程运行
主进程结束,运行时间为5.175220727920532

回调函数:保证使用appy_aysnc异步提交的任务执行结束时,调用回调函数
能够保证所有的进程任务执行完毕之后都去执行回调函数的内容(可以对不同进程统一定义回调函数)
回调函数:需要定义一个参数,参数就是任务的返回值
如下所示:

from multiprocessing import Pool
import time
def work(i):
    print("{}子进程运行开始".format(i))
    time.sleep(0.1)
    return i

def callback(index):
    print("{}子进程运行结束".format(index))

if __name__ == "__main__":
    start_time = time.time()
    p = Pool(3)
    for i in range(5):
        p.apply_async(work,args=(i,),callback=callback)
    p.close()
    p.join()
    end_time = time.time()
    process_time = end_time-start_time
    print("主进程结束,运行时间为{}".format(process_time))

输出:

0子进程运行开始
1子进程运行开始
2子进程运行开始
3子进程运行开始
0子进程运行结束
4子进程运行开始
1子进程运行结束
2子进程运行结束
3子进程运行结束
4子进程运行结束
主进程结束,运行时间为0.35140514373779297

第四次:使用multiprocessing下的Pool的appy_aysnc异步实现
计算密集型:1.05s
IO密集型:10.2s
执行如下:

from multiprocessing import Pool
import time
def sum(a,b):
    s = 0
    for i in range(a,b):
        s+=i
        # time.sleep(0.5)
    return s

def sum4(a,b):
    p = Pool(3)
    p.apply_async(sum,args=(0,a))
    p.apply_async(sum,args=(0,b))
    p.close()
    p.join()

if __name__ == "__main__":
    start_time = time.time()
    sum4(10000000,20000000)
    end_time = time.time()
    process_time = end_time-start_time
    print("运行时间为{}".format(process_time))

如果想得到计算结果,可以通过回调函数实现,如下:

from multiprocessing.pool import Pool
import time
def sum(a,b):
    s = 0
    for i in range(a,b):
        s+=i
    return s
def call_back(s):
    print(s)

def sum2(a,b):
    pool = Pool(3)
    pool.apply_async(sum,args=(0,a),callback=call_back)
    pool.apply_async(sum,args=(0,b),callback=call_back)
    pool.close()
    pool.join()

if __name__ =="__main__":
    start_time = time.time()
    sum2(10000000,2000000)
    end_time =time.time()
    print(end_time-start_time)

输出:

1999999000000
49999995000000
0.6505746841430664

加大线程池5,一次创建10个线程
计算密集型:4.29
IO密集型:35.27

from multiprocessing import Pool
import time
def sum(a,b):
    s = 0
    for i in range(a,b):
        s+=i
        time.sleep(0.5)
    return s
def sum4(a,b):
    p = Pool(5)
    for i in range(10):
        p.apply_async(sum,args=(0,a))
        p.apply_async(sum,args=(0,b))
    p.close()
    p.join()

if __name__ == "__main__":
    start_time = time.time()
    sum4(10,20)
    end_time = time.time()
    process_time = end_time-start_time
    print("运行时间为{}".format(process_time))

以上所有模拟时间总结汇总如下表:
在这里插入图片描述

三、多线程

第一次:直接创建多线程
IO密集型:10.01
计算密集型:1.27

import threading,time
def sum(a,b):
    s = 0
    for i in range(a,b):
        s+=i
        #IO密集型
        time.sleep(0.5)
    return s

def sum1(a,b):
    t1 = threading.Thread(target=sum,args=(0,a))
    t2 = threading.Thread(target=sum,args=(0,b))
    t1.start()
    t2.start()
    t1.join()
    t2.join()

if __name__ == "__main__":
    start_time = time.time()
    sum1(10,20)
    end_time = time.time()
    process_time = end_time-start_time
    print(process_time)在这里插入代码片

第二次:使用线程池
IO密集型:20.12
计算密集型:2.7

import threading,time
from multiprocessing.pool import ThreadPool
def sum(a,b):
    s = 0
    for i in range(a,b):
        s+=i
        #IO密集型
        time.sleep(0.5)
    return s

def sum2(a,b):
    pool = ThreadPool(3)
    for i in range(3):
        pool.apply_async(sum,args=(0,a))
        pool.apply_async(sum,args=(0,b))

    pool.close()
    pool.join()

if __name__ == "__main__":
    start_time = time.time()
    sum2(10,20)
    end_time = time.time()
    process_time = end_time-start_time
    print(process_time)

使用threadpool下的线程池
首先要安装第三方包:pip install threadpool
使用第三方包下的线程池使用步骤为:
(1)引入threadpool模块
(2)定义线程函数
(3)创建线程池threadpool.ThreadPool(maxsize)
(4)创建需要线程池处理的任务列表
requests= threadpool.makeRequests(),
参数列表:[((参数1,参数2),(字典型参数1,字典型参数2))]
如果参数有缺失,要用None代替,如:

argslist = [((0,a),None),((0,b),None)]
resquest = threadpool.makeRequests(sum,args_list=argslist)

(5)将创建的多个任务put到线程池中。
线程池对象.putRequests(任务)
(6) 等到所有任务处理完毕
线程池对象.wait()----与先用close再用join效果类似

具体使用方式如下:

import threadpool,time

def work(i):
    print("{}开始执行".format(i))
    time.sleep(0.1)
    print("{}执行结束".format(i))

if __name__ == "__main__":
    pool=threadpool.ThreadPool(3)
    resquest = threadpool.makeRequests(work,args_list=[0,1,2])
    for r in resquest:
        pool.putRequest(r)
    pool.wait()
    print("主线程执行完毕")

输出:

0开始执行
1开始执行
2开始执行
0执行结束
1执行结束
2执行结束
主线程执行完毕

第三次:使用threadpool下的线程池进行模拟
IO密集型:10.00
计算密集型:1.28
代码如下:

import threadpool,time

def sum(a,b):
    s = 0
    for i in range(a,b):
        s+=i
        # time.sleep(0.5)
    return s

def sum3(a,b):
    pool = threadpool.ThreadPool(3)
    argslist = [((0,a),None),((0,b),None)]
    resquest = threadpool.makeRequests(sum,args_list=argslist)
    for r in resquest:
        pool.putRequest(r)
    pool.wait()

if __name__=="__main__":
    start_time = time.time()
    sum3(10000000,20000000)
    end_time = time.time()
    process_time = end_time-start_time
    print("主进程结束")
    print(process_time)

各种运行方式时间总结如下:
在这里插入图片描述

四、concurrent.futures包下的进程池线程池使用

ThreadPoolExecutor:线程池异步调用
ProcessPoolExecutor:进程池异步调用
抽象类Executor:(一个类不能直接创建对象,可以被继承)
是一个上下文管理
实现了__enter__,__exit__
如:

with ThreadPoolExecutor(max_workers=3) as executor:
    pass

ThreadPoolExecutor和ProcessPoolExecutor是Executor的扩展类
使用ThreadPoolExecutor和ProcessPoolExecutor,max_workers最大线程或者进程数,默认是4
executor 相当于每一个进程或者线程的调用者,负责将线程或者进程,加入到线程池或者进程池

1.submit(fun,*args,**kwargs)
fun:任务函数(需要被异步执行的函数)
*args,**kwargs:函数的参数(不需要以元组的形式)
submit的返回值,即将要异步执行的任务(futures)
任务.result() 获得异步调用该函数时的返回值(注意:以阻塞的方式返回)
如下所示:

from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor,as_completed,wait
import time
def doTasks(i):
    s = "{}发试卷".format(i)
    time.sleep(0.5)
    return s

li = ["人员1","人员2","人员3","人员4"]
with ThreadPoolExecutor(max_workers=3) as executor:
    for i in li:
        task = executor.submit(doTasks,i)
        print(task.result())

输出:

人员1发试卷
人员2发试卷
人员3发试卷
人员4发试卷

如果有多个参数需要传入,直接按列表传入,如下所示:

from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor,as_completed,wait
import time
def doTasks(a,b):
    s = "{}发试卷{}".format(a,b)
    time.sleep(0.5)
    return s

li = [(1,"a"),(2,"b")]
with ThreadPoolExecutor(max_workers=3) as executor:
    for x,y in li:
        task = executor.submit(doTasks,x,y)
        print(task.result())

输出:

1发试卷a
2发试卷b

或者直接用列表推导式的方法直接将参数传入,如下:

with ThreadPoolExecutor(max_workers=3) as executor:
        task = [executor.submit(doTasks,x,y) for x,y in [(1,"a"),(2,"b")]]
        for t in task:
            print(t.result())

输出:

1发试卷a
2发试卷b

相关的方法:
running() 可以显示执行那一刻的线程运行状态
done() 执行那一刻线程完成的状态
如下所示:

with ThreadPoolExecutor(max_workers=3) as executor:
        task = [executor.submit(doTasks,x) for x in range(5)]
        for index,t in enumerate(task):
            print("{}的执行状态是{},完成状态是{}".format(index,t.running(),t.done()))

输出:

0的执行状态是False,完成状态是True
1的执行状态是False,完成状态是True
2的执行状态是False,完成状态是True
3的执行状态是False,完成状态是True
4的执行状态是False,完成状态是True

2.as_completed 获得已经执行完毕的任务
as_completed(待执行的任务,timeout)
as_completed不能保证按照原来list任务列表的顺序执行
如下:


```python
from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor,as_completed,wait
import time
def doTasks(a):
    s = "{}发试卷".format(a)
    time.sleep(0.5)
    return s


with ThreadPoolExecutor(max_workers=3) as executor:
        task = [executor.submit(doTasks,x) for x in range(100)]
        for index,t in enumerate(as_completed(task)):
            print("{}的执行状态是{},完成状态是{},返回值是{}".format(index,t.running(),t.done(),t.result()))
其中的一段输出:

```python
59的执行状态是False,完成状态是True,返回值是58发试卷
60的执行状态是False,完成状态是True,返回值是60发试卷
61的执行状态是False,完成状态是True,返回值是62发试卷
62的执行状态是False,完成状态是True,返回值是61发试卷
63的执行状态是False,完成状态是True,返回值是63发试卷

3.wait(待执行的任务列表,timeout,return_when):
return_when的参数:
ALL_COMPLETED :程序会一直阻塞,阻塞到线程池里面的所有任务全部完成
FIRST_COMPLETED : 保证会少一个任务完成

如下所示:ALL_COMPLETED

with ThreadPoolExecutor(max_workers=3) as executor:
        task = [executor.submit(doTasks,x) for x in range(3)]
        done,undone = wait(task,return_when=ALL_COMPLETED)
        for i in done:
            print(i)
        print("==============")
        for j in undone:
            print(j)

输出:

<Future at 0x1e19e980a20 state=finished returned str>
<Future at 0x1e19eae4b38 state=finished returned str>
<Future at 0x1e19ed82978 state=finished returned str>
==============

如下所示:FIRST_COMPLETED

with ThreadPoolExecutor(max_workers=3) as executor:
        task = [executor.submit(doTasks,x) for x in range(3)]
        done,undone = wait(task,return_when=FIRST_COMPLETED)
        for i in done:
            print(i)
        print("==============")
        for j in undone:
            print(j)

输出:

<Future at 0x187194f0a20 state=finished returned str>
==============
<Future at 0x187198a8400 state=running>
<Future at 0x187199029b0 state=running>

wait中ALL_COMPLETED == as_completed()
不同 地方在于ALL_COMPLETED 会一直阻塞,到执行完成之后才会执行下面的进程
as_completed 阻塞到 一次性完成(最大线程数)的任务,可以继续执行下面的进程。

4. map()函数
取代submit操作,返回值是任务调用的返回值集合。
map能够确保任务执行完毕,相当于使用as_completed
map(func,iterables)
func 要执行异步调用的函数名,iterables 多次任务传入的参数
如果任务只有单参数,多个任务可以打包传入[1,2,3]
如果任务是多个参数,则需要传入多个元组
(任务一的参数1,任务二的参数1),(任务一的参数2,任务二的参数2)
如下:

with ThreadPoolExecutor(max_workers=3) as executor:
      results= executor.map(doTasks,[1,2,3,4,5])
      for i in results:
          print(i)

输出:

1发试卷
2发试卷
3发试卷
4发试卷
5发试卷

当有多个参数传入时,按如下方法进行传入:

from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor,as_completed,\
    wait,ALL_COMPLETED,FIRST_COMPLETED
import time
def doTasks(a,b):
    s = "{}发试卷{}".format(a,b)
    time.sleep(0.5)
    return s

l = [(1,"a"),(2,"b")]
with ThreadPoolExecutor(max_workers=3) as executor:
      results = executor.map(doTasks,(1,2),("a","b"))
      results1= executor.map(doTasks,*list(zip(*l)))
      for x,y in results,results1:
          print(x,y)

输出:

1发试卷a 2发试卷b
1发试卷a 2发试卷b

submit和map对比:
1、 map很好,代码简洁,执行之后,执行的顺序是按照原来任务的顺序
缺点, 不能够给不同的任务执行map,只能执行相同的任务。
2.、submit:一般习惯跟as_completed,不按任务的执行顺序
优点:可以执行不同的任务,使得不同的任务之间并行

用concurrect.futures包下的线程池模拟
IO密集型:10.01
计算密集型:0.88
代码如下:

from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor,as_completed,\
    wait,ALL_COMPLETED,FIRST_COMPLETED
import time
import time
def sum(a,b):
    s = 0
    for i in range(a,b):
        s+=i
        # time.sleep(0.5)
    return s
#IO密集型:10.01
#计算密集型:0.88
def sum5(a,b):
    with ThreadPoolExecutor(max_workers=3) as executor:
        results=executor.map(sum,(0,0),(a,b))

if __name__ == "__main__":
    start_time = time.time()
    sum5(1000000,20000000)
    end_time = time.time()
    print(end_time-start_time)

五、运行结果的获取

进程的运行结果获取:可以用变量直接接收,异步时需要用到get()方法
线程的运行结果获取:可以用变量直接接收运行结果,也可以设置一个全局变量在被调用函数中接收结果
第三方库时的运行结果接收用callback函数,注意callback函数定义时有两个变量:

#进程的运行结果获取
def sum3(b1,b2):
    pool=Pool(3)
    result=[]
    #同步获得函数返回值,就是apply函数的返回值
    result.append(pool.apply(sum,args=(0,b1)))
    # print("result的值={}".format(result))
    result.append(pool.apply(sum,args=(0,b2)))
    return result


def sum4(b1,b2):
    pool=Pool(3)
    result=[]
    # 异步方式获得函数的返回值,是调用apply_async获得一个对象,调用对象的get方法
    result.append(pool.apply_async(sum,args=(0,b1)).get())
    result.append(pool.apply_async(sum,args=(0,b2)).get())
    pool.close()
    pool.join()
    return result

#线程的运行结果获取
import threading,time
result=[]
def sum(a,b):
    s=0
    for i in range(a,b):
        s+=i
        # time.sleep(0.5)
    result.append(s)
    return s


def sum2(b1,b2):
    pool = ThreadPool(3)
    result=[]
    for i in range(10):
        result.append(pool.apply_async(sum,args=(0,b1)).get())
        result.append(pool.apply_async(sum,args=(0,b2)).get())
    pool.close()
    pool.join()
    return result
 
 #第三方库Threadpool运行值获取
 def back(request,s):
    print(s)
def sum3(b1,b2):
    pool=threadpool.ThreadPool(5)
    asl=[((0,b1),None),((0,b2),None)]
    requests=threadpool.makeRequests(sum,args_list=asl,callback=back)
    for i in requests:
        pool.putRequest(i)
    pool.wait()

原文链接:https://blog.csdn.net/weixin_46197111/article/details/116035091




0

1

2

3

4



所属网站分类: 技术文章 > 博客

作者:dfd323

链接:https://www.pythonheidong.com/blog/article/953260/09df63a1ced14e6c055a/

来源:python黑洞网

任何形式的转载都请注明出处,如有侵权 一经发现 必将追究其法律责任

11 0
收藏该文
已收藏

评论内容:(最多支持255个字符)