条件变量
所谓condition条件变量,是python提供一种线程之间同步机制,这种机制是在满足了特定的条件后,线程才可以访问相关的数据。
同步流程机制(和event类似):
一个线程等待特定的条件 另一个线程通知它条件已经发生。 一旦条件发生,该线程就会获取锁,从而独占共享资源的访问。
condition条件变量条件变量,线程间进行同步通信的时候的另一种锁机制,通常用于复杂的线程同步锁,它提供了比Lock, RLock更高级的功能,能空控制复杂的线程同步问题。
Condition 和互斥锁Lock和可重入锁RLock
互斥锁Lock和RLock只能提供简单的加锁和释放锁等功能
互斥锁Lock和RLock主要作用是在多线程访问共享数据时,保护共享数据,防止数据被脏读脏写,保证数据和关键代码的完整性
Condition类不仅自身依赖于Lock和RLock,即具有它们的阻塞特性,此外还提供了一些有利于线程通信,以及解决复杂线程同步问题的方法,它也被称作条件变量。
Condition和event
Condition和event形式上类似,都可以理解为基于事件的通知机制。
Condition使用通知唤醒等待的线程 event- 设置set()切换标记,也类似通知通知 Condition其实本质还是使用Rlock,但你可以指定使用Lock的方式来处理。
Condition 类的一些方法:
1)init
从Condition的对象创建中可以看出:
__init__(self,lock=None)
Condition 对象总是需要绑定关联一个锁,可以是外部传入的锁或是系统默认创建的锁。Condition的对象主要实现的对传入的锁的一个操作管理。
ps:如果不指定lock参数,那么Python会自动创建一个与之绑定的Lock对象。
2)acquire(timeout)
调用其实是调用Condition类关联的Lock/RLock的acquire()方法。
3)release
调用Condition类关联的Lock/RLock的release()方法。
4)wait(timeout)
此方法调用会让当前线程进入 Condition 的等待池等待通知并释放锁,直到其他线程调用该 Condition 的 notify() 或 notify_all() 方法来唤醒该线程。在调用该 wait() 方法时可传入一个 timeout 参数,指定该线程最多等待多少秒。
PS:timeout :这是一个可选参数,它指定线程等待通知调用的时间。如果超时,线程将被唤醒,重新获取锁,而控制将被返回。
PS:wait()必须在已获得Lock的前提下调用,否则会引起RuntimeError错误。
5)notify() 或 notify(n=1)
唤醒在该 Condition的waiting等待池池中的n个线程正在等待的线程并通知它,收到通知的线程将自动调用 acquire() 方法进行尝试加锁。
如果所有线程都在该 Condition 等待池中等待,则会选择唤醒其中一个线程,选择是任意性的(如果waiting池中有多个线程,随机选择n个唤醒,n是指定要唤醒的线程数量,默认为1.)
须在已获得Lock的前提下调用,否则将引发错误。
6)notify_all()
唤醒waiting池中的等待的所有线程并通知它们。
7)enter和exit
enter和exit使得对象支持上下文操作:
示例1:用条件变量来保持我们的Event中出现调用的顺序问题
-(A先说话,B回答,你一句我一句对话形式):
(PS:示例在两个线程的情况是是可以看到顺序是一致的,多线程的话就是另一个思路操作了!所以条件变量的notify() 也是对全局所有的等待的线程的起作用的,当然,有通知的数据量的限制,也并非是全局。)
import threading
from threading import Condition
# 服务员
class Fuwuyuan(threading.Thread):
def __init__(self, cond):
self.cond = cond
super().__init__(name="fuwuyuan")
def run(self):
print("开门!,等客人进门先!")
self.cond.acquire()
self.cond.wait()
print('2:{}:客人你好,有什么可以为你服务的吗?. '.format(self.name))
self.cond.notify()
self.cond.wait()
print('4:{}:一个人那么能吃啊!. '.format(self.name))
print('5:{}:没带女朋友来一起吗? '.format(self.name))
self.cond.notify()
self.cond.wait()
print('7:{}:呵呵呵,不送再见! '.format(self.name))
self.cond.notify()
# self.cond.wait()
self.cond.release()
class Xiaofeizhe(threading.Thread):
def __init__(self, cond):
super().__init__(name="xiaofeizhe")
self.cond = cond
def run(self):
print("进门!喊一个可漂亮的客服美女姐姐过来先!")
self.cond.acquire()
print('1:{}:服务员. '.format(self.name))
self.cond.notify()
self.cond.wait()
print('3:{}:我要吃一个鸡腿!! . '.format(self.name))
self.cond.notify()
self.cond.wait()
print('6:{}:一个人咋滴,没妹子啊!要不你来当我妹子! . '.format(self.name))
self.cond.notify()
# self.cond.wait()
self.cond.release()
if __name__ == '__main__':
condition = Condition()
fuwyuan = Fuwuyuan(condition)
xiaofeizhe = Xiaofeizhe(condition)
# 先启动麦当劳开门先!
fuwyuan.start()
# 客人进门点餐
xiaofeizhe.start()
输出结果:
开门!,等客人进门先!
进门!喊一个可漂亮的客服美女姐姐过来先!
1:xiaofeizhe:服务员.
2:fuwuyuan:客人你好,有什么可以为你服务的吗?.
3:xiaofeizhe:我要吃一个鸡腿!! .
4:fuwuyuan:一个人那么能吃啊!.
5:fuwuyuan:没带女朋友来一起吗?
6:xiaofeizhe:一个人咋滴,没妹子啊!要不你来当我妹子! .
7:fuwuyuan:呵呵呵,不送再见!
使用with方式进行上锁和释放锁:
import threading
from threading import Condition
# 服务员
class Fuwuyuan(threading.Thread):
def __init__(self, cond):
self.cond = cond
super().__init__(name="fuwuyuan")
def run(self):
print("开门!,等客人进门先!")
# self.cond.acquire()
with self.cond:
self.cond.wait() # 等待消费者的唤醒通知
print('2:{}:来了!客人你好,有什么可以为你服务的吗?. '.format(self.name))
self.cond.notify() # 通知消费者进行点餐的操作
self.cond.wait()
print('4:{}:一个人那么能吃啊!. '.format(self.name))
print('5:{}:没带女朋友来一起吗? '.format(self.name))
self.cond.notify()
self.cond.wait()
print('7:{}:呵呵呵,不送再见! '.format(self.name))
self.cond.notify()
# self.cond.wait()
class Xiaofeizhe(threading.Thread):
def __init__(self, cond,name):
super().__init__(name=name)
self.cond = cond
def run(self):
print("进门!喊一个可漂亮的客服美女姐姐过来先!")
self.cond.acquire()
print('1:{}:来一个服务员服务员. '.format(self.name))
self.cond.notify()# 通知服务员,解除服务员的等待状态
self.cond.wait() # 消费者自己等服务员回应
print('3:{}:我要吃一个鸡腿!! . '.format(self.name))
self.cond.notify()
self.cond.wait()
print('6:{}:一个人咋滴,没妹子啊!要不你来当我妹子! . '.format(self.name))
self.cond.notify()
# self.cond.wait()
self.cond.release()
if __name__ == '__main__':
condition = Condition()
fuwyuan = Fuwuyuan(condition)
xiaofeizhe = Xiaofeizhe(condition,'大哥大')
# 先启动麦当劳开门先!
fuwyuan.start()
# 客人进门点餐
xiaofeizhe.start()
注意点:
1:使用条件变量,第一步是先上锁最后的异步是记得要解锁,也可以使用(with self.condition )来自动的上锁和释放锁 2:只有上锁后才可以进行等待或通知的操作。 3:因为condition的本质还是斥锁Lock和可重入锁RLock,所有上锁和解锁必须是同事的存在额操作
应用场合(在线程安装的队列中的使用)
class Queue:
'''Create a queue object with a given maximum size.
If maxsize is <= 0, the queue size is infinite.
'''
def __init__(self, maxsize=0):
self.maxsize = maxsize
self._init(maxsize)
# mutex must be held whenever the queue is mutating. All methods
# that acquire mutex must release it before returning. mutex
# is shared between the three conditions, so acquiring and
# releasing the conditions also acquires and releases mutex.
self.mutex = threading.Lock()
# Notify not_empty whenever an item is added to the queue; a
# thread waiting to get is notified then.
self.not_empty = threading.Condition(self.mutex)
# Notify not_full whenever an item is removed from the queue;
# a thread waiting to put is notified then.
self.not_full = threading.Condition(self.mutex)
# Notify all_tasks_done whenever the number of unfinished tasks
# drops to zero; thread waiting to join() is notified to resume
self.all_tasks_done = threading.Condition(self.mutex)
self.unfinished_tasks = 0
def task_done(self):
'''Indicate that a formerly enqueued task is complete.
Used by Queue consumer threads. For each get() used to fetch a task,
a subsequent call to task_done() tells the queue that the processing
on the task is complete.
If a join() is currently blocking, it will resume when all items
have been processed (meaning that a task_done() call was received
for every item that had been put() into the queue).
Raises a ValueError if called more times than there were items
placed in the queue.
'''
with self.all_tasks_done:
unfinished = self.unfinished_tasks - 1
if unfinished <= 0:
if unfinished < 0:
raise ValueError('task_done() called too many times')
self.all_tasks_done.notify_all()
self.unfinished_tasks = unfinished
def join(self):
'''Blocks until all items in the Queue have been gotten and processed.
The count of unfinished tasks goes up whenever an item is added to the
queue. The count goes down whenever a consumer thread calls task_done()
to indicate the item was retrieved and all work on it is complete.
When the count of unfinished tasks drops to zero, join() unblocks.
'''
with self.all_tasks_done:
while self.unfinished_tasks:
self.all_tasks_done.wait()
def qsize(self):
'''Return the approximate size of the queue (not reliable!).'''
with self.mutex:
return self._qsize()
def empty(self):
'''Return True if the queue is empty, False otherwise (not reliable!).
This method is likely to be removed at some point. Use qsize() == 0
as a direct substitute, but be aware that either approach risks a race
condition where a queue can grow before the result of empty() or
qsize() can be used.
To create code that needs to wait for all queued tasks to be
completed, the preferred technique is to use the join() method.
'''
with self.mutex:
return not self._qsize()
def full(self):
'''Return True if the queue is full, False otherwise (not reliable!).
This method is likely to be removed at some point. Use qsize() >= n
as a direct substitute, but be aware that either approach risks a race
condition where a queue can shrink before the result of full() or
qsize() can be used.
'''
with self.mutex:
return 0 < self.maxsize <= self._qsize()
def put(self, item, block=True, timeout=None):
'''Put an item into the queue.
If optional args 'block' is true and 'timeout' is None (the default),
block if necessary until a free slot is available. If 'timeout' is
a non-negative number, it blocks at most 'timeout' seconds and raises
the Full exception if no free slot was available within that time.
Otherwise ('block' is false), put an item on the queue if a free slot
is immediately available, else raise the Full exception ('timeout'
is ignored in that case).
'''
with self.not_full:
if self.maxsize > 0:
if not block:
if self._qsize() >= self.maxsize:
raise Full
elif timeout is None:
while self._qsize() >= self.maxsize:
self.not_full.wait()
elif timeout < 0:
raise ValueError("'timeout' must be a non-negative number")
else:
endtime = time() + timeout
while self._qsize() >= self.maxsize:
remaining = endtime - time()
if remaining <= 0.0:
raise Full
self.not_full.wait(remaining)
self._put(item)
self.unfinished_tasks += 1
self.not_empty.notify()
def get(self, block=True, timeout=None):
'''Remove and return an item from the queue.
If optional args 'block' is true and 'timeout' is None (the default),
block if necessary until an item is available. If 'timeout' is
a non-negative number, it blocks at most 'timeout' seconds and raises
the Empty exception if no item was available within that time.
Otherwise ('block' is false), return an item if one is immediately
available, else raise the Empty exception ('timeout' is ignored
in that case).
'''
with self.not_empty:
if not block:
if not self._qsize():
raise Empty
elif timeout is None:
while not self._qsize():
self.not_empty.wait()
elif timeout < 0:
raise ValueError("'timeout' must be a non-negative number")
else:
endtime = time() + timeout
while not self._qsize():
remaining = endtime - time()
if remaining <= 0.0:
raise Empty
self.not_empty.wait(remaining)
item = self._get()
self.not_full.notify()
return item
def put_nowait(self, item):
'''Put an item into the queue without blocking.
Only enqueue the item if a free slot is immediately available.
Otherwise raise the Full exception.
'''
return self.put(item, block=False)
def get_nowait(self):
'''Remove and return an item from the queue without blocking.
Only get an item if one is immediately available. Otherwise
raise the Empty exception.
'''
return self.get(block=False)
# Override these methods to implement other queue organizations
# (e.g. stack or priority queue).
# These will only be called with appropriate locks held
# Initialize the queue representation
def _init(self, maxsize):
self.queue = deque()
def _qsize(self):
return len(self.queue)
# Put a new item in the queue
def _put(self, item):
self.queue.append(item)
# Get an item from the queue
def _get(self):
return self.queue.popleft()
基于threading.Condition()设计一个简单线程安全队列
线程安全队列:
多线程的操作需要保持数据一致和数据安全(保证多个线程获取的串行)
#!/usr/bin/evn python
# -*- coding: utf-8 -*-
"""
-------------------------------------------------
文件名称 : 线程安全队列
文件功能描述 : 功能描述
创建人 : 小钟同学
"""
import time
import threading
class ThreadSafeQueue:
def __init__(self, max_size=0):
# 定义存储的元素列表
self.queue = []
# 最大的数据
self.max_size = max_size
# 线程互斥锁,保证多线程对列表操作的串行化
self.lock = threading.Lock()
# 定义的条件变量对象
self.condition = threading.Condition()
# 获取当前队列元素的数量--获取的过程也是进行互斥锁的
def size(self):
with self.lock:
size = len(self.queue)
return size
# 从队列里面获取元素
def get(self, index):
with self.lock:
item = self.queue[index]
return item
# 往队列里面放入元素
def put(self, item):
if self.max_size != 0 and self.size() > self.max_size:
return "队列已经满了!"
# 进行串行的添加对象
with self.lock:
print('写入元素: %s' % item)
self.queue.append(item)
print("当前队列的元素:", self.queue)
# 唤醒其他线程继续执行
with self.condition:
self.condition.notify()
# 批量插入元素
def batch_put(self, item_list):
for item in item_list:
self.put(item)
# 从队列取出元素block-是否需要阻塞的等待
def pop(self, block=True, timeout=None):
if self.size() == 0:
# 需要阻塞等待
if block:
with self.condition:
self.condition.wait(timeout=timeout)
else:
return None
# 进行安全的获取原始--加锁
with self.lock:
item = None
if len(self.queue) > 0:
item = self.queue.pop()
print("=====>" * 10)
print('从队列中获取元素: %s' % item)
print("=====>" * 10)
return item
class Producer(threading.Thread):
def run(self):
ssds = "你是一个大爷"
import random
while True:
item = random.choice(ssds)
queue.put(item)
time.sleep(1)
class Consumer(threading.Thread):
def run(self):
while True:
item = queue.pop(block=False, timeout=-1)
time.sleep(1)
if __name__ == '__main__':
queue = ThreadSafeQueue(max_size=100)
producer = Producer()
producer2 = Producer()
consumer = Consumer()
producer.start()
producer2.start()
consumer.start()
producer.join()
producer2.join()
consumer.join()
输出结果:
写入元素: 一
当前队列的元素: ['一']
写入元素: 是
当前队列的元素: ['一', '是']
=====>=====>=====>=====>=====>=====>=====>=====>=====>=====>
从队列中获取元素: 是
=====>=====>=====>=====>=====>=====>=====>=====>=====>=====>
写入元素: 一
当前队列的元素: ['一', '一']
写入元素: 你
当前队列的元素: ['一', '一', '你']
=====>=====>=====>=====>=====>=====>=====>=====>=====>=====>
从队列中获取元素: 你
=====>=====>=====>=====>=====>=====>=====>=====>=====>=====>
写入元素: 爷
当前队列的元素: ['一', '一', '爷']
写入元素: 是
当前队列的元素: ['一', '一', '爷', '是']
=====>=====>=====>=====>=====>=====>=====>=====>=====>=====>
从队列中获取元素: 是
=====>=====>=====>=====>=====>=====>=====>=====>=====>=====>
写入元素: 是
当前队列的元素: ['一', '一', '爷', '是']
写入元素: 是
当前队列的元素: ['一', '一', '爷', '是', '是']
=====>=====>=====>=====>=====>=====>=====>=====>=====>=====>
从队列中获取元素: 是
=====>=====>=====>=====>=====>=====>=====>=====>=====>=====>
写入元素: 是
当前队列的元素: ['一', '一', '爷', '是', '是']
写入元素: 大
当前队列的元素: ['一', '一', '爷', '是', '是', '大']
=====>=====>=====>=====>=====>=====>=====>=====>=====>=====>
从队列中获取元素: 大
=====>=====>=====>=====>=====>=====>=====>=====>=====>=====>
写入元素: 个
当前队列的元素: ['一', '一', '爷', '是', '是', '个']
写入元素: 个
当前队列的元素: ['一', '一', '爷', '是', '是', '个', '个']
=====>=====>=====>=====>=====>=====>=====>=====>=====>=====>
从队列中获取元素: 个
=====>=====>=====>=====>=====>=====>=====>=====>=====>=====>
写入元素: 你
当前队列的元素: ['一', '一', '爷', '是', '是', '个', '你']
写入元素: 一
当前队列的元素: ['一', '一', '爷', '是', '是', '个', '你', '一']
=====>=====>=====>=====>=====>=====>=====>=====>=====>=====>
从队列中获取元素: 一
=====>=====>=====>=====>=====>=====>=====>=====>=====>=====>
写入元素: 大
当前队列的元素: ['一', '一', '爷', '是', '是', '个', '你', '大']
写入元素: 爷
当前队列的元素: ['一', '一', '爷', '是', '是', '个', '你', '大', '爷']
=====>=====>=====>=====>=====>=====>=====>=====>=====>=====>
从队列中获取元素: 爷
=====>=====>=====>=====>=====>=====>=====>=====>=====>=====>
写入元素: 个
当前队列的元素: ['一', '一', '爷', '是', '是', '个', '你', '大', '个']
写入元素: 爷
当前队列的元素: ['一', '一', '爷', '是', '是', '个', '你', '大', '个', '爷']
=====>=====>=====>=====>=====>=====>=====>=====>=====>=====>
从队列中获取元素: 爷
=====>=====>=====>=====>=====>=====>=====>=====>=====>=====>
写入元素: 一
当前队列的元素: ['一', '一', '爷', '是', '是', '个', '你', '大', '个', '一']
写入元素: 是
当前队列的元素: ['一', '一', '爷', '是', '是', '个', '你', '大', '个', '一', '是']
基于threading.Event()设计一个简单线程安全队列
import time
import threading
class ThreadSafeQueue:
def __init__(self, max_size=0):
# 定义存储的元素列表
self.queue = []
# 最大的数据
self.max_size = max_size
# 线程互斥锁,保证多线程对列表操作的串行化
self.lock = threading.Lock()
# 定义的条件变量对象
# self.condition = threading.Condition()
self._event = threading.Event()
# 获取当前队列元素的数量--获取的过程也是进行互斥锁的
def size(self):
with self.lock:
size = len(self.queue)
return size
# 从队列里面获取元素
def get(self, index):
with self.lock:
item = self.queue[index]
return item
# 往队列里面放入元素
def put(self, item):
if self.max_size != 0 and self.size() > self.max_size:
return "队列已经满了!"
# 进行串行的添加对象
with self.lock:
print('写入元素: %s' % item)
self.queue.append(item)
print("当前队列的元素:", self.queue)
# 唤醒其他线程继续执行
if len(self.queue) != 0:
self._event.clear()
while not self._event.is_set():
self._event.set()
# 批量插入元素
def batch_put(self, item_list):
for item in item_list:
self.put(item)
# 从队列取出元素block-是否需要阻塞的等待
def pop(self, block=True, timeout=None):
if self.size() == 0:
# 需要阻塞等待
if block:
if self._event.is_set():
self._event.wait()
else:
return None
# 进行安全的获取原始--加锁
with self.lock:
item = None
if len(self.queue) > 0:
item = self.queue.pop()
print("=====>" * 10)
print('从队列中获取元素: %s' % item)
print("=====>" * 10)
return item
class Producer(threading.Thread):
def run(self):
ssds = "你是一个大爷"
import random
while True:
item = random.choice(ssds)
queue.put(item)
time.sleep(1)
class Consumer(threading.Thread):
def run(self):
while True:
item = queue.pop(block=False, timeout=-1)
time.sleep(1)
if __name__ == '__main__':
queue = ThreadSafeQueue(max_size=100)
producer = Producer()
producer2 = Producer()
consumer = Consumer()
producer.start()
producer2.start()
consumer.start()
producer.join()
producer2.join()
consumer.join()
输出结果:
写入元素: 个
当前队列的元素: ['个']
写入元素: 大
当前队列的元素: ['个', '大']
=====>=====>=====>=====>=====>=====>=====>=====>=====>=====>
从队列中获取元素: 大
=====>=====>=====>=====>=====>=====>=====>=====>=====>=====>
写入元素: 个
当前队列的元素: ['个', '个']
写入元素: 大
当前队列的元素: ['个', '个', '大']
=====>=====>=====>=====>=====>=====>=====>=====>=====>=====>
从队列中获取元素: 大
=====>=====>=====>=====>=====>=====>=====>=====>=====>=====>
参考资料:
https://blog.csdn.net/ckk727/article/details/99950843
个人其他博客地址
简书:https://www.jianshu.com/u/d6960089b087
掘金:https://juejin.cn/user/2963939079225608
小钟同学 | 文 【原创】| QQ:308711822
1:本文相关描述主要是个人的认知和见解,如有不当之处,还望各位大佬指正。 2:关于文章内容,有部分内容参考自互联网整理,如有链接会声明标注;如没有及时标注备注的链接的,如有侵权请联系,我会立即删除处理哟。




