多任务

本文最后更新于:2023年12月5日 晚上

并发: 任务多于 cpu 核心数, 通过操作系统的各种任务调度算法, 实现多个任务”一起”执行(实际上总有一些任务不在执行, 因为切换任务的速度非常快, 看上去是一起执行而已)

并行: 指的是任务数小于等于 cpu 核心数, 即任务真的是一起执行

进程、线程、协程的区别

  • 进程是资源分配的单位
  • 线程是操作系统调度的单位
  • 进程切换需要的资源很大, 效率很低
  • 线程稍好
  • 协程最好
  • 多进程、线程根据 cpu 核数不一样可能是并行的,但是协程是在一个线程中,所以肯定是并发

线程 thread

  1. python 的 thread 模块是比较底层的模块, threading 模块对 thread 做了一些包装, 可以更加方便的被调用
  2. Python 的 threading.Thread 类有一个 run 方法, 用于定义线程的功能函数, 可以在自己的线程中覆盖该方法, 而创建自己的线程实例后, 通过 Thread 的 start 方法启动该线程, 交给 Python 虚拟机进行调度, 当该线程获得指定机会的时候, 就会调用 run 方法指定线程
  3. 多线程程序的执行顺序是不确定的

线程池

关于设置线程池的进程数:
设 cpu 核心数为 N
IO 密集型: 线程池设置为 2N + 1
CPU 密集型: 线程池设置为 N + 1
如果不设置, 默认就是 N

总结:

  1. 每个线程都有一个默认的名字, python 自动为线程指定一个名字
  2. 当线程的 run 方法结束时, 该线程结束
  3. 无法控制线程调度程度, 但可以通过其他方式来影响线程调度方式

多线程 - 共享全局变量

  • 在一个进程内的所有线程共享全局变量, 很方便在多个线程间共享数据
  • 缺点就是: 线程对全局变量随意更改可能造成多线程之间对全局变量的混乱(即线程非安全)
  • 如果多个线程同时对一个全局变量操作, 会出现资源竞争问题, 从而数据结果会不正确

进程

进程池中的 pid 都是固定的, 设置了最大链接数, 就有多少个 pid
通过进程池调用进程, 进程池满了之后, 当前进程执行完, 后面排队的进程才会执行

进程间通信

使用 Queue

from multiprocessing import Queue
q = Queue(10) # 初始化一个Queue对象, 最多可以接受10条消息
# q.qsize()  返回当前队列包含的消息数量
# q.empty()  判断队列是否为空, 为空返回true
# q.full()  判断队列是否已满
# q.get([block[,timeout])  获取队列中的一条消息, 然后将其从队列中移除, block默认为true, 表示如果队列中没有消息则等待timeout秒, 没有设置timeout的话就会一直等待下去
# q.get_nowait() 相当于q.get(false),表示队列为空,不等待马上抛出"Queue.Empty"异常
# q.put(item[,block[,timeout]])  将item写入队列, block默认值是true, 表示如果队列已满, 则等待timeout秒, 不设置timeout的话就一直等待下去
# q.put_nowait(item) 相当于Queue.put(item,false), 如果队列已满, 直接抛出"Queue.Full"异常

进程池

当需要创建的子进程数量不多时,可以直接利用 multiprocessing 中的 Process 动态成生多个进程,但如果是上百甚至上千个目标,手动的去创建进程的工作量巨大,此时就可以用到 multiprocessing 模块提供的 Pool 方法。
初始化 Pool 时,可以指定一个最大进程数,当有新的请求提交到 Pool 中时,如果池还没有满,那么就会创建一个新的进程用来执行该请求;但如果池中的进程数已经达到指定的最大值,那么该请求就会等待,直到池中有进程结束,才会用之前的进程来执行新的任务。

from multiprocessing import Pool
def worker(msg):
    print(msg)
po=Pool(3) #定义一个进程池,最大进程数3
for i in range(10):
    po.apply_async(worker,(i))
print("----start----")
po.close() #关闭进程池,关闭后po不再接收新的请求
po.join() #等待po中所有子进程执行完成,必须放在close语句之后
print("-----end-----")

# apply_async(func[, args[, kwds]]) :使用非阻塞方式调用func(并行执行,堵塞方式必须等待上一个进程退出才能执行下一个进程),args为传递给func的参数列表,kwds为传递给func的关键字参数列表;
# close():关闭Pool,使其不再接受新的任务;
# terminate():不管任务是否完成,立即终止;
# join():主进程阻塞,等待子进程的退出, 必须在close或terminate之后使用;

进程池中的进程之间的通信

如果要使用 Pool 创建进程,就需要使用 multiprocessing.Manager()中的 Queue(),而不是 multiprocessing.Queue(),否则会得到一条如下的错误信息:RuntimeError: Queue objects should only be shared between processes through inheritance.

# -*- coding:utf-8 -*-

# 修改import中的Queue为Manager
from multiprocessing import Manager,Pool
import os,time,random

def reader(q):
    print("reader启动(%s),父进程为(%s)" % (os.getpid(), os.getppid()))
    for i in range(q.qsize()):
        print("reader从Queue获取到消息:%s" % q.get(True))

def writer(q):
    print("writer启动(%s),父进程为(%s)" % (os.getpid(), os.getppid()))
    for i in "itcast":
        q.put(i)

if __name__=="__main__":
    print("(%s) start" % os.getpid())
    q = Manager().Queue()  # 使用Manager中的Queue
    po = Pool()
    # 使用阻塞模式创建进程,这样就不需要在reader中使用死循环了,可以让writer完全执行完成后,再用reader去读取
    po.apply_async(writer, (q,))

    time.sleep(1)  # 先让上面的任务向Queue存入数据,然后再让下面的任务开始从中取数据

    po.apply_async(reader, (q,))
    po.close()
    po.join()
    print("(%s) End" % os.getpid())

GIL:

在 Python 多线程中, 每个线程的执行顺序:

  1. 获取 GIL
  2. 执行代码知道 sleep 或者是 python 虚拟机将其挂起
  3. 释放 GIL

可见: 某个线程想要执行, 必须先拿到 GIL, 可以把 GIL 看作是”通行证”, 并且在一个 python 中, GIL 只有一个。拿不到通信证的线程,就不允许进入 CPU 执行。所以, python 中的多线程其实是”伪多线程”, 不管我们开多少线程, 都不能做到并行执行, 无法利用 cpu 的多核优势

总结 :  在 python 中, 对于多核心 CPU

  1. CPU 密集型任务适合多进程, 充分利用 CPU 多核心的计算优势, 多进程一般用于资源隔离, 或者弥补那些多核支持不好的语言。Python 中的多进程就属于后者
  2. IO 密集型任务适合多线程, 因为 IO 密集型代码瓶颈是 IO 不是 CPU, 用多线程比多进程适合, 多进程耗费的系统资源多, 切换速度慢

话说回来: CPU 密集型任务由于主要消耗 CPU 资源,因此,代码运行效率至关重要。Python 这样的脚本语言运行效率很低,完全不适合计算密集型任务。对于计算密集型任务,最好用 C 语言编写。

不过对于 IO 密集型任务,涉及到网络、磁盘 IO 的任务都是 IO 密集型任务,这类任务的特点是 CPU 消耗很少,任务的大部分时间都在等待 IO 操作完成(因为 IO 的速度远远低于 CPU 和内存的速度)。对于 IO 密集型任务,任务越多,CPU 效率越高,但也有一个限度。常见的大部分任务都是 IO 密集型任务,比如 Web 应用。

IO 密集型任务执行期间,99%的时间都花在 IO 上,花在 CPU 上的时间很少,因此,用运行速度极快的 C 语言替换用 Python 这样运行速度极低的脚本语言,完全无法提升运行效率。对于 IO 密集型任务,最合适的语言就是开发效率最高(代码量最少)的语言,脚本语言是首选,C 语言最差。

协程

协程,又称微程序,纤程。英文名 Coroutine。

协程是 python 个中另外一种实现多任务的方式,只不过比线程更小占用更小执行单元(理解为需要的资源)。 为啥说它是一个执行单元,因为它自带 CPU 上下文。这样只要在合适的时机, 我们可以把一个协程 切换到另一个协程。 只要这个过程中保存或恢复 CPU 上下文那么程序还是可以运行的。

通俗的理解:在一个线程中的某个函数,可以在任何地方保存当前函数的一些临时变量等信息,然后切换到另外一个函数中执行,注意不是通过调用函数的方式做到的,并且切换的次数以及什么时候再切换到原来的函数都由开发者自己确定, 比如子程序 A、B

def A():
    print('1')
    pirnt('2')
    print('3')
def B():
    print('a')
    print('b')
    print('c')

假设由协程执行,在执行 A 的过程中,可以随时中断,去执行 B,B 也可能在执行过程中中断再去执行 A,结果可能是:

1
2
x
y
3
z

但是在 A 中是没有调用 B 的,所以协程的调用比函数调用理解起来要难一些。
看起来 A、B 的执行有点像多线程,但协程的特点在于是一个线程执行,那和多线程比,协程有何优势?

最大的优势就是协程极高的执行效率。因为子程序切换不是线程切换,而是由程序自身控制,因此,没有线程切换的开销,和多线程比,线程数量越多,协程的性能优势就越明显。
第二大优势就是不需要多线程的锁机制,因为只有一个线程,也不存在同时写变量冲突,在协程中控制共享资源不加锁,只需要判断状态就好了,所以执行效率比多线程高很多。
因为协程是一个线程执行,那怎么利用多核 CPU 呢?最简单的方法是多进程+协程,既充分利用多核,又充分发挥协程的高效率,可获得极高的性能。

Python 对协程的支持是通过 generator 实现的。
例子:

def consumer():
    r = ''
    while True:
        n = yield r
        if not n:
            return
        print('[CONSUMER] Consuming %s...' % n)
        r = '200 OK'

def produce(c):
    c.send(None) # 启动
    n = 0
    while n < 5:
        n = n + 1
        print('[PRODUCER] Producing %s...' % n)
        r = c.send(n) # 生产n, 将n传递消费者
        print('[PRODUCER] Consumer return: %s' % r)
    c.close()

c = consumer()
produce(c)

consumer 是一个 generator , 把一个 consumer 传入 produce 后:

  1. c.send(None) 启动生成器

  2. 一旦生产了东西, 通过 c.send(n)切换到 consumer 执行

  3. consumer 通过 yield 拿到消息, 处理, 又通过 yield 把结果传回

  4. produce 拿到 consumer 处理的结果, 继续生产下一条消息

  5. produce 决定不生产了, 通过 c.close()关闭 consumer, 整个过程结束


多任务
http://blog.lujinkai.cn/Python/基础/多任务/
作者
像方便面一样的男子
发布于
2020年12月9日
更新于
2023年12月5日
许可协议