Python 内置的队列 Queue
在 python 的众多标准库中,有一个 Queue 类,可以帮助我们实现一些简单的,队列相关的业务需求。
queue 模块实现了多生产者、多消费者队列。而且这个队列是线程安全的。

可以看出:Queue 内部通过线程锁,来保证队列的线程安全。
该模块实现了三种类型的队列,它们的区别仅仅是元素取回的顺序,包含如下:
FIFO:(first in first out) 先进先出; LIFO:(last in first out) 后进先出; 优先级队列:每次取出的是最小值 (采用堆的结构,内部使用的是内置模块 heapq),如下图所示: 
此外,内置方法中还提供了一个简单的 FIFO 队列类型,SimpleQueue
,提供使用。
四种 Queue 类
1. class queue.Queue(maxsize=0)
先进先出类型,maxsize
是个整数,用于设置可以放入队列中的元素数的上限。当达到这个大小的时候,插入操作将阻塞至队列中的元素被消费掉。如果 maxsize
小于等于零,队列长度为无限大。
2. class queue.LifoQueue(maxsize=0)
后进先出类型。
3. queue.PriorityQueue(maxsize=0)
优先级队列。(最小堆)
4. class queue.SimpleQueue(maxsize=0)
无界的 FIFO 队列构造函数。简单的队列,缺少任务跟踪等高级功能。无界的意思是,SimpleQueue 是基于双向队列实现的 (python 内置方法 deque,不需要锁)。

队列对象的实例方法
队列对象 (Queue, LifoQueue, 或者 PriorityQueue) 提供下列描述的公共方法。
Queue.qsize()
返回当前队列中元素的长度。

Queue.empty()
如果队列为空,返回
True
,否则返回False
。Queue.full()
如果队列是满的返回
True
,否则返回False
。Queue.put(item, block=True, timeout=None)
将
item
放入队列。官方解释:如果可选参数 block 是 true 并且 timeout 是 None (默认),则在必要时阻塞至有空闲插槽可用。如果 timeout 是个正数,将最多阻塞 timeout 秒,如果在这段时间没有可用的空闲插槽,将引发 Full 异常。反之 (block 是 false),如果空闲插槽立即可用,则把 item 放入队列,否则引发 Full 异常 ( 在这种情况下,timeout 将被忽略)。
这里自己总结一下,
block
参数表示插入时是否允许阻塞。当为True
时表示允许阻塞,直到有位置时再插入(阻塞时间超过timeout则引发异常);当为False
时,表示不允许阻塞,要立即插入,如果没有空位,则立即引发异常。timeout
参数,表示允许阻塞时,可以阻塞的最大时长,当timeout
为一个正数时,阻塞时间超过这个时长,则引发异常。Queue.put_nowait(item)
相当于
put(item, False)
,即立即插入,不阻塞等待。Queue.get(block=True, timeout=None)
从队列中移除并返回一个元素。意思是从队列中取出一条数据。
Queue.get_nowait()
相当于
get(False)
。立即取出一条数据,不阻塞等待。
官方还提供了两个方法,用于支持跟踪 排队的任务 是否 被守护的消费者线程 完整的处理。
Queue.task_done()
表示前面排队的任务已经被完成。被队列的消费者线程使用。每个 get() 被用于获取一个任务, 后续调用 task_done() 告诉队列,该任务的处理已经完成。
如果 join() 当前正在阻塞,在所有元素都被处理后,将解除阻塞(意味着每个 put() 进队列的元素的 task_done() 都被收到)。
如果被调用的次数多于放入队列中的项目数量,将引发 ValueError 异常 。
Queue.join()
阻塞至队列中所有的元素都被接收和处理完毕。
当元素添加到队列的时候,未完成任务的计数就会增加。每当消费者线程调用 task_done() 表示这个元素已经被回收,该元素所有工作已经完成,未完成计数就会减少。当未完成计数降到零的时候, join() 阻塞被解除。
意思就是,
put
方法会增加未完成任务的计数,task_done
方法会减少计数。官方demo
import threading, queue
q = queue.Queue()
def worker():
while True:
item = q.get()
print(f'Working on {item}')
print(f'Finished {item}')
q.task_done()
# 开启子线程去队列中取数据
threading.Thread(target=worker, daemon=True).start()
for item in range(30):
# 主线程往队列存数据
q.put(item)
print('All task requests sent\n', end='')
# 阻塞主线程,直到队列中所有的元素都被处理完毕
q.join()
print('All work completed')




