通常我们的业务中有使用到关于锁的一些使用,其实这个老生常谈,但是刚好也有别人问起,我也水文一下,纯属瞎扯一下!
关于分布式锁,大概已是人人皆知,目前最大概最流程的都是基于redis下来解决的吧,由于我的对ZK还不熟,所以暂时不会去考虑这个。这里主要是推演一下关于分布式锁的出现几种发展过程。
1 关于锁要解决的几个问题点?
通常的锁的引入都是为了确保同一时间内,有且只能有一个命令的执行是有效的,避免数据的执行同时并发数据异常,出现不一致性。
所以锁的引入一般主要考虑的问题其实是:
操作命令的原子性 数据一致性 数据的完全性
有些时候使用锁其实还有其他的好处,比如我们的启动一些服务的时候,不能同时启动多个实例的之类的额,使用锁其实是有必要。
当然锁的使用,其实要考虑的是和你业务有关的,而锁的实现的方式则多样:
文件锁(文件句柄占用互斥) 进程锁(socker端口占用互斥) 线程锁 分布式锁
关于数据库事务一些说明:
1:事务可以一次执行多个命令
2:事务是一个单独的隔离操作:事务中的所有命令都会序列化、按顺序地执行。
3:事务在执行的过>程中,不会被其他客户端发送来的命令请求所打断。
4:事务是一个原子操作:事务中的命令要么全部被执行,要么全部都不执行。
分布式锁需要考虑解决的问题:
锁安全性问题--不能被别人删除 锁过期性问题,不能永久上锁,避免死锁 锁过期引发的无法删除锁,业务逻辑执行时间和锁过期时间不一致的,锁续租问题 高可用情况的锁的同步过程中时差问题
本笔记梳理的内容概要:
单进程服务下一个库存扣减问题 引入数据库锁中非乐观锁机制解决多进程或多线程下的并发库扣减问题 引入redis简单的锁扣减问题的引发其他问题的思考 redis锁中的get和set一致性问题思考 reids上锁的时候setnx和set(xxx,nx=true,ex=10)的原子操作使用 redis下利用WATCH事务机制来实现锁的释放和上锁的机制 引入lua脚步确保执行的操作原子性,还有续租问题。
2:问题演示前提准备
1:数据库使用的POSTGRESQL

2:库存表的创建:

peewee模型创建:
from peewee import *
# database = PostgresqlDatabase('zyxadminsystem', **{'host': 'localhost', 'port': 5432, 'user': 'postgres', 'password': '123456'})
from apps.modules.hanxuan_api_online.models.manage_pg_peewee import database, session_scope, session_scope_atomic
from peewee import *
from playhouse.shortcuts import model_to_dict, dict_to_model
# 自定义的
import json
class JSONField(TextField):
def db_value(self, value):
return json.dumps(value)
def python_value(self, value):
if value is not None:
return json.loads(value)
class UnknownField(object):
def __init__(self, *_, **__): pass
class BaseModel(Model):
class Meta:
database = database
class Stocks(BaseModel):
name = TextField(constraints=[SQL("DEFAULT ''::text")], index=True, null=True)
stokenum = IntegerField(null=True)
class Meta:
table_name = 'stocks'
if __name__ == '__main__':
Stocks.create(name='华为手机',stokenum=100)
2 超卖问题分析
2.1 问题场景:
前提:单机单体应用的情况下 问题描述:多线程并发下库存扣减引发的库存扣减不一致 问题示例代码:
单线程下执行:
def ordercreat(num):
with database.atomic():
# 库存剩余查询
result = Stocks.get_or_none(Stocks.name == '苹果手机')
if result and result.stokenum > 0:
print("开始i执行库存扣减之前,查询到的库存数是:", result.stokenum)
result.stokenum -= num
result.save()
result = Stocks.get_or_none(Stocks.name == '苹果手机')
print("扣减之后,查询到的库存数是:", result.stokenum)
else:
print("库存不足")
database.rollback()
if __name__ == '__main__':
ordercreat(10)
上面的代码单线程下执行:
开始i执行库存扣减之前,查询到的库存数是:100
扣减之后,查询到的库存数是:90
多线程下执行:
from peewee import *
# database = PostgresqlDatabase('zyxadminsystem', **{'host': 'localhost', 'port': 5432, 'user': 'postgres', 'password': '123456'})
from apps.modules.hanxuan_api_online.models.manage_pg_peewee import database, session_scope, session_scope_atomic
from peewee import *
from playhouse.shortcuts import model_to_dict, dict_to_model
# 自定义的
import json
class JSONField(TextField):
def db_value(self, value):
return json.dumps(value)
def python_value(self, value):
if value is not None:
return json.loads(value)
class UnknownField(object):
def __init__(self, *_, **__): pass
class BaseModel(Model):
class Meta:
database = database
class Stocks(BaseModel):
name = TextField(constraints=[SQL("DEFAULT ''::text")], index=True, null=True)
stokenum = IntegerField(null=True)
class Meta:
table_name = 'stocks'
def ordercreat(num):
with database.atomic():
# 库存剩余查询
for paynum in range(3):
print("当前执行的线程是:", threading.current_thread().name)
result = Stocks.get_or_none(Stocks.name == '苹果手机')
if result and result.stokenum > 0:
print("开始i执行库存扣减之前,查询到的库存数是:", result.stokenum)
result.stokenum -= num
result.save()
result = Stocks.get_or_none(Stocks.name == '苹果手机')
print("扣减之后,查询到的库存数是:", result.stokenum)
print("=======》:"*5)
else:
print("库存不足")
database.rollback()
if __name__ == '__main__':
import threading
liststh = []
for i in range(15):
t = threading.Thread(name='name-t' + str(i), target=ordercreat, args=(10,))
t.start()
liststh.append(t)
for i in liststh:
i.join()
print("主线程结束!")
执行输出:

从上面的所处的中可以看出问题很明显,再多进程的情况下,库存的读取的数据是有问题的!这个很吃亏的!如果你使用这个方式来处理的,有可能多线程的情况,获取到的是同一个的数据!
2.2 加线程锁处理?但是不是分布式锁啊!
首先我们的强调的,我们的当前的示例是处于:单体单机单进程下
PS:这种加锁的情况下,基本的上线程的执行就是类似需要线性的执行了!!因为需要等待解锁的操作了!!!
示例代码:
from peewee import *
# database = PostgresqlDatabase('zyxadminsystem', **{'host': 'localhost', 'port': 5432, 'user': 'postgres', 'password': '123456'})
from apps.modules.hanxuan_api_online.models.manage_pg_peewee import database, session_scope, session_scope_atomic
from peewee import *
from playhouse.shortcuts import model_to_dict, dict_to_model
# 自定义的
import json
class JSONField(TextField):
def db_value(self, value):
return json.dumps(value)
def python_value(self, value):
if value is not None:
return json.loads(value)
class UnknownField(object):
def __init__(self, *_, **__): pass
class BaseModel(Model):
class Meta:
database = database
class Stocks(BaseModel):
name = TextField(constraints=[SQL("DEFAULT ''::text")], index=True, null=True)
stokenum = IntegerField(null=True)
class Meta:
table_name = 'stocks'
import threading
# 全局锁
globalthreading_lock = threading.Lock()
def ordercreat(num):
with database.atomic():
# 库存剩余查询
globalthreading_lock.acquire()
for paynum in range(3):
print("操作库存前进行上锁:",threading.current_thread().name)
print("当前执行的线程是:", threading.current_thread().name)
result = Stocks.get_or_none(Stocks.name == '苹果手机')
if result and result.stokenum > 0:
print("开始i执行库存扣减之前,查询到的库存数是:", '线程名称:',threading.current_thread().name,"库存:",result.stokenum)
result.stokenum -= num
result.save()
result = Stocks.get_or_none(Stocks.name == '苹果手机')
print("扣减之后,查询到的库存数是:", '线程名称:',threading.current_thread().name,"库存:",result.stokenum)
print("=======》:"*5)
else:
print("库存不足")
database.rollback()
# 开始释放锁!
globalthreading_lock.release()
print("开始释放锁:", threading.current_thread().name)
if __name__ == '__main__':
liststh = []
for i in range(15):
t = threading.Thread(name='name-t' + str(i), target=ordercreat, args=(10,))
t.start()
liststh.append(t)
for i in liststh:
i.join()
print("主线程结束!")
最终输出的代码的示例:

PS:上面的加锁问题,仅仅是基于单机单体单进程的情况下?但是如果假设你的是多进程的部署的服务的呢?甚至你的是微服务的形式,多机部署的情况下呢?所以还是需要进一步考虑解决分布式锁的问题?
ps:其实上面的示例在库存更新的时候,还存在一个原子性得到问题,在PEEWEE建议使用API来update来执行更新,
2.3 数据库锁,乐观锁?
其实基于数据库锁的这种情况,我觉得,既然你既然涉及到并发了,但是数据库加锁的话,那你的并发都会全部的阻塞了,还还谈什么并发呢?高并发的情况下,使用数据库锁,那估计不太好吧!估计你的数据库会挂的很快得吧!!!!
但是数据库锁情况,也要考虑你连接是不是同一个数据库,同一个数据库的话,还好!如果不是呢?
在数据库中通常有锁的有:
悲观锁(进行数据库表上锁,容易出现并发性能瓶颈问题,一般不考虑了吧,既然谈到并发了,就不要锁表了) 乐观锁(业务层去处理数据库,如果不是超高的并发情况下,可以考虑)
谈谈乐观锁:
给库存表新增一个版本字段:



乐观锁的逻辑思路:
执行查询数据库信息获取库存信息,获取到当前查询的版本号信息 再出来完成库存扣减完成的时候,通过查询当前线程获取到的版本的记录来更新记录 依靠版本号不同的条件来限制数据的更新失败来处理(利用数据库更新的执行命令的原子性来互斥数据更新)
具体示例:
from peewee import *
# database = PostgresqlDatabase('zyxadminsystem', **{'host': 'localhost', 'port': 5432, 'user': 'postgres', 'password': '123456'})
from apps.modules.hanxuan_api_online.models.manage_pg_peewee import database, session_scope, session_scope_atomic
from peewee import *
from playhouse.shortcuts import model_to_dict, dict_to_model
# 自定义的
import json
class JSONField(TextField):
def db_value(self, value):
return json.dumps(value)
def python_value(self, value):
if value is not None:
return json.loads(value)
class UnknownField(object):
def __init__(self, *_, **__): pass
class BaseModel(Model):
class Meta:
database = database
class Stocks(BaseModel):
name = TextField(constraints=[SQL("DEFAULT ''::text")], index=True, null=True)
stokenum = IntegerField(null=True)
version = IntegerField(null=True, default=0)
class Meta:
table_name = 'stocks'
import threading
# 全局锁
globalthreading_lock = threading.Lock()
def ordercreat(num):
with database.atomic():
for paynum in range(3):
print("操作库存前进行上锁:", threading.current_thread().name)
print("当前执行的线程是:", threading.current_thread().name)
result = Stocks.get_or_none(Stocks.name == '苹果手机')
if result and result.stokenum > 0:
print("开始i执行库存扣减之前,查询到的库存数是:", '线程名称:', threading.current_thread().name, "库存:", result.stokenum,
'版本信息:', result.version)
# 保证更新的原子性-,保证执行到底
_updatarresult = Stocks.update(stokenum=Stocks.stokenum - num, version=Stocks.version + 1) \
.where(Stocks.name == '苹果手机', Stocks.version == result.version) \
.execute()
if _updatarresult:
result = Stocks.get_or_none(Stocks.name == '苹果手机')
print("更新成功!,扣减之后,查询到的库存数是:", '线程名称:', threading.current_thread().name, "库存:", result.stokenum,
'版本信息:',
result.version)
else:
print("更新失败!!", threading.current_thread().name, "库存:", result.stokenum, '版本信息:', result.version)
print("=======》:" * 5)
else:
print("库存不足")
database.rollback()
if __name__ == '__main__':
liststh = []
for i in range(15):
t = threading.Thread(name='name-t' + str(i), target=ordercreat, args=(10,))
t.start()
liststh.append(t)
for i in liststh:
i.join()
print("主线程结束!")
关键的地方是:

基于数据库更新的时候原子性更新机制!如果更新失败,可以进行重试处理!
执行输出:
操作库存前进行上锁:name-t0
当前执行的线程是:name-t0
开始i执行库存扣减之前,查询到的库存数是: 线程名称:name-t0 库存:100 版本信息: 0
操作库存前进行上锁:name-t1
当前执行的线程是:name-t1
更新成功!,扣减之后,查询到的库存数是: 线程名称:name-t0 库存:90 版本信息:1
=======》:=======》:=======》:=======》:=======》:
操作库存前进行上锁:name-t0
当前执行的线程是:name-t0
开始i执行库存扣减之前,查询到的库存数是: 线程名称:name-t1 库存:100 版本信息: 0
开始i执行库存扣减之前,查询到的库存数是: 线程名称:name-t0 库存:90 版本信息:1
操作库存前进行上锁:name-t2
当前执行的线程是:name-t2
更新成功!,扣减之后,查询到的库存数是: 线程名称:name-t0 库存:80 版本信息:2
=======》:=======》:=======》:=======》:=======》:
操作库存前进行上锁:name-t0
当前执行的线程是:name-t0
开始i执行库存扣减之前,查询到的库存数是: 线程名称:name-t0 库存:80 版本信息:2
开始i执行库存扣减之前,查询到的库存数是: 线程名称:name-t2 库存:100 版本信息: 0
更新成功!,扣减之后,查询到的库存数是: 线程名称:name-t0 库存:70 版本信息:3
=======》:=======》:=======》:=======》:=======》:
操作库存前进行上锁:name-t3
当前执行的线程是:name-t3
更新失败!!name-t1 库存:100 版本信息: 0
=======》:=======》:=======》:=======》:=======》:
操作库存前进行上锁:name-t1
当前执行的线程是:name-t1
开始i执行库存扣减之前,查询到的库存数是: 线程名称:name-t1 库存:70 版本信息:3
开始i执行库存扣减之前,查询到的库存数是: 线程名称:name-t3 库存:70 版本信息:3
更新成功!,扣减之后,查询到的库存数是: 线程名称:name-t1 库存:60 版本信息:4
=======》:=======》:=======》:=======》:=======》:
操作库存前进行上锁:name-t1
当前执行的线程是:name-t1
操作库存前进行上锁:name-t4
当前执行的线程是:name-t4
开始i执行库存扣减之前,查询到的库存数是: 线程名称:name-t1 库存:60 版本信息:4
开始i执行库存扣减之前,查询到的库存数是: 线程名称:name-t4 库存:70 版本信息:3
更新成功!,扣减之后,查询到的库存数是: 线程名称:name-t1 库存:50 版本信息:5
=======》:=======》:=======》:=======》:=======》:
更新失败!!name-t2 库存:100 版本信息: 0
=======》:=======》:=======》:=======》:=======》:
操作库存前进行上锁:name-t2
当前执行的线程是:name-t2
操作库存前进行上锁:name-t5
当前执行的线程是:name-t5
开始i执行库存扣减之前,查询到的库存数是: 线程名称:name-t2 库存:50 版本信息:5
开始i执行库存扣减之前,查询到的库存数是: 线程名称:name-t5 库存:50 版本信息:5
更新成功!,扣减之后,查询到的库存数是: 线程名称:name-t2 库存:40 版本信息:6
=======》:=======》:=======》:=======》:=======》:
操作库存前进行上锁:name-t2
当前执行的线程是:name-t2
开始i执行库存扣减之前,查询到的库存数是: 线程名称:name-t2 库存:40 版本信息:6
操作库存前进行上锁:name-t6
当前执行的线程是:name-t6
更新成功!,扣减之后,查询到的库存数是: 线程名称:name-t2 库存:30 版本信息:7
=======》:=======》:=======》:=======》:=======》:
开始i执行库存扣减之前,查询到的库存数是: 线程名称:name-t6 库存:50 版本信息:5
更新失败!!name-t3 库存:70 版本信息:3
=======》:=======》:=======》:=======》:=======》:
操作库存前进行上锁:name-t3
当前执行的线程是:name-t3
更新失败!!name-t6 库存:50 版本信息:5
=======》:=======》:=======》:=======》:=======》:
操作库存前进行上锁:name-t6
当前执行的线程是:name-t6
开始i执行库存扣减之前,查询到的库存数是: 线程名称:name-t3 库存:30 版本信息:7
开始i执行库存扣减之前,查询到的库存数是: 线程名称:name-t6 库存:30 版本信息:7
操作库存前进行上锁:name-t7
当前执行的线程是:name-t7
更新成功!,扣减之后,查询到的库存数是: 线程名称:name-t3 库存:20 版本信息:8
=======》:=======》:=======》:=======》:=======》:
操作库存前进行上锁:name-t3
当前执行的线程是:name-t3
开始i执行库存扣减之前,查询到的库存数是: 线程名称:name-t7 库存:30 版本信息:7
开始i执行库存扣减之前,查询到的库存数是: 线程名称:name-t3 库存:20 版本信息:8
操作库存前进行上锁:name-t8
当前执行的线程是:name-t8
更新成功!,扣减之后,查询到的库存数是: 线程名称:name-t3 库存:10 版本信息:9
=======》:=======》:=======》:=======》:=======》:
开始i执行库存扣减之前,查询到的库存数是: 线程名称:name-t8 库存:30 版本信息:7
更新失败!!name-t4 库存:70 版本信息:3
=======》:=======》:=======》:=======》:=======》:
操作库存前进行上锁:name-t4
当前执行的线程是:name-t4
更新失败!!name-t8 库存:30 版本信息:7
=======》:=======》:=======》:=======》:=======》:
操作库存前进行上锁:name-t8
当前执行的线程是:name-t8
开始i执行库存扣减之前,查询到的库存数是: 线程名称:name-t4 库存:10 版本信息:9
操作库存前进行上锁:name-t9
当前执行的线程是:name-t9
开始i执行库存扣减之前,查询到的库存数是: 线程名称:name-t8 库存:10 版本信息:9
更新成功!,扣减之后,查询到的库存数是: 线程名称:name-t4 库存: 0 版本信息:10
=======》:=======》:=======》:=======》:=======》:
操作库存前进行上锁:name-t4
当前执行的线程是:name-t4
开始i执行库存扣减之前,查询到的库存数是: 线程名称:name-t9 库存:10 版本信息:9
库存不足
更新失败!!name-t5 库存:50 版本信息:5
=======》:=======》:=======》:=======》:=======》:
操作库存前进行上锁:name-t5
当前执行的线程是:name-t5
操作库存前进行上锁:name-t10
当前执行的线程是:name-t10
开始i执行库存扣减之前,查询到的库存数是: 线程名称:name-t5 库存:10 版本信息:9
开始i执行库存扣减之前,查询到的库存数是: 线程名称:name-t10 库存:10 版本信息:9
更新成功!,扣减之后,查询到的库存数是: 线程名称:name-t5 库存: 0 版本信息:10
=======》:=======》:=======》:=======》:=======》:
操作库存前进行上锁:name-t5
当前执行的线程是:name-t5
操作库存前进行上锁:name-t11
当前执行的线程是:name-t11
库存不足
更新失败!!name-t6 库存:30 版本信息:7
=======》:=======》:=======》:=======》:=======》:
操作库存前进行上锁:name-t6
当前执行的线程是:name-t6
开始i执行库存扣减之前,查询到的库存数是: 线程名称:name-t11 库存:10 版本信息:9
开始i执行库存扣减之前,查询到的库存数是: 线程名称:name-t6 库存:10 版本信息:9
操作库存前进行上锁:name-t12
当前执行的线程是:name-t12
更新成功!,扣减之后,查询到的库存数是: 线程名称:name-t6 库存: 0 版本信息:10
=======》:=======》:=======》:=======》:=======》:
更新失败!!name-t7 库存:30 版本信息:7
=======》:=======》:=======》:=======》:=======》:
操作库存前进行上锁:name-t7
当前执行的线程是:name-t7
库存不足
操作库存前进行上锁:name-t12
当前执行的线程是:name-t12
库存不足
操作库存前进行上锁:name-t13
当前执行的线程是:name-t13
更新失败!!name-t8 库存:10 版本信息:9
=======》:=======》:=======》:=======》:=======》:
操作库存前进行上锁:name-t8
当前执行的线程是:name-t8
操作库存前进行上锁:name-t7
当前执行的线程是:name-t7
库存不足
库存不足
库存不足
操作库存前进行上锁:name-t12
当前执行的线程是:name-t12
更新失败!!name-t9 库存:10 版本信息:9
=======》:=======》:=======》:=======》:=======》:
操作库存前进行上锁:name-t9
当前执行的线程是:name-t9
库存不足
操作库存前进行上锁:name-t13
当前执行的线程是:name-t13
库存不足
操作库存前进行上锁:name-t14
当前执行的线程是:name-t14
库存不足
操作库存前进行上锁:name-t9
当前执行的线程是:name-t9
更新失败!!name-t10 库存:10 版本信息:9
=======》:=======》:=======》:=======》:=======》:
操作库存前进行上锁:name-t10
当前执行的线程是:name-t10
库存不足
库存不足
库存不足
操作库存前进行上锁:name-t13
当前执行的线程是:name-t13
库存不足
更新失败!!操作库存前进行上锁:name-t11 库存: name-t1010 版本信息:9
=======》:=======》:=======》:=======》:=======》:
操作库存前进行上锁:name-t11
当前执行的线程是:
name-t11
当前执行的线程是:name-t10
操作库存前进行上锁:name-t14
当前执行的线程是:name-t14
库存不足
库存不足
操作库存前进行上锁:name-t11
当前执行的线程是:name-t11
库存不足
库存不足
操作库存前进行上锁:name-t14
当前执行的线程是:name-t14
库存不足
库存不足
主线程结束!
主要是观察失败得到地方:

2.4 redis分布式锁!
因为我们的数据库的悲观锁,始终依赖数据库的查询,数据库的查询我们都知道抗并发上肯定是不行的,所以为了更高的并发的话,那我们就需要利用相关的中间件组件来处理。甚至于我们的还可能会引入消息队列等。
分布式锁我们一般需要考虑的问题有:
锁具有互斥性 --- 有且只有一个锁存在 锁要具有安全--- 解锁还需上锁人的问题,不能被别人给删 死锁(当死锁之后,其他人就无法获取锁) 锁的时效性问题,每个锁都需要有过期的时间,避免死锁, 锁的过期性引发的关于锁时效和业务逻辑执行时效性的问题 多线程情况的锁的过期性引发锁被别人的线程给删除的问题 线程1:上锁----执行业务---超过锁机制---锁过期 线程2:因为线程锁过期,获取到锁---执行业务----还在执行的时候,被线程1业务执行完成后,给删除了锁!
下面简单一步一步的引发一些锁问题的思考:
那基于redis下的分布式锁可能会遇到什么问题?
简单锁实现:
class MyRedisLock:
def __init__(self, lockname):
self.lockname = lockname
self.redis_client = redis.Redis(host='127.0.0.1', password='123456')
def acquire(self):
# 如果没有获取到值----获取的锁值的情况,如果同时多线程获取到了,该怎么办?????
if not self.redis_client.get(self.lockname):
self.redis_client.set(self.lockname, 1)
return True,self
else:
# 一直想的循环等待获取锁
while True:
time.sleep(0.5)
if self.redis_client.get(self.lockname):
return True,self
def release(self):
# 如果没有获取到值
self.redis_client.delete(self.lockname)
return self
上面的锁示例,我们的
if not self.redis_client.get(self.lockname):
上面简单锁示例存在问题,
存在问题,如果多个线程同时进入到get的话,和数据库一样的一样的会遇到问题,所以我们需要保证get和set的原子性。保证这两个命令具备有事务性,必须一致的完成!
所以我们的需要改造我们的锁获取方式,修改为确保原子性:
self.redis_client.setnx(self.lockname,1):
是get 和set的结合体
另外还可以通过:来设置
self.redis_client.set(self.lockname,1,nx=True):
修改后:
确保上锁原子性实现:
class MyRedisLock:
def __init__(self, lockname):
self.lockname = lockname
self.redis_client = redis.Redis(host='127.0.0.1', password='123456')
def acquire(self):
# 如果没有获取到值----获取的锁值的情况,如果同时多线程获取到了,该怎么办?????
if self.redis_client.setnx(self.lockname,1):
return True,self
else:
# 一直想的循环等待获取锁
while True:
time.sleep(0.5)
if self.redis_client.setnx(self.lockname, 1):
return True,self
def release(self):
# 如果没有获取到值
self.redis_client.delete(self.lockname)
return self
上面的问题,我们我们可以处理关于set和get的原子性问题,进一步的考虑的话,考虑我们的锁不能给别人给删除的情况,这种我们的需要各位我们的锁进行唯一ID标记处理。
给锁做ID标记用于删除的时候避免误删实现:
import redis
import time
import uuid
class MyRedisLock:
def __init__(self, lockname):
self.lockerid = uuid.uuid4()
self.lockname = lockname
self.redis_client = redis.Redis(host='127.0.0.1', password='123456')
def acquire(self):
# 如果没有获取到值----获取的锁值的情况,如果同时多线程获取到了,该怎么办?????
# 设置锁为我们的锁ID,10秒过期
if self.redis_client.set(self.lockname, self.lockerid, nx=True, ex=10):
return True, self
else:
# 一直想的循环等待获取锁
while True:
time.sleep(0.5)
if self.redis_client.setnx(self.lockname, 1):
return True, self
def release(self):
# 如果没有获取到值---------------
lockerid = self.redis_client.get(self.lockname)
# 但是还是可能会遇到 的问题是:=======如果只是执行了?self.redis_client.get(self.lockname)就被打打断呢?所以这里两个命令的分开执行还是不具备原子操作
if lockerid == self.lockerid:
self.redis_client.delete(self.lockname)
return self
基于reids事务基础来实现分布式锁?
reids要实现事务操作的话,我了解到主要是靠:WATCH, MULTI, EXEC, DISCARD事务机制等实现分布式锁。
示例代码来源:https://blog.csdn.net/t8116189520/article/details/91383256
简单的示例:
import redis
import time
import uuid
class MyRedisLock:
def __init__(self, lockname):
self.lockid = uuid.uuid4()
self.lockname = lockname
self.redis_client = redis.Redis(host='127.0.0.1', password='123456')
# 获取一个分布式锁
def acquire_lock(self, acquire_time=10, time_out=10):
# 生成唯一id
identifier = str(uuid.uuid4())
# 客户端获取锁的结束时间
end = time.time() + acquire_time
while time.time() < end:
# setnx(key,value) 只有key不存在情况下,将key的值设置为value,若key存在则不做任何动作,返回True和False
if self.redis_client.setnx(self.lockname, self.lockid):
# 设置键的过期时间,过期自动剔除,释放锁
self.redis_client.expire(self.lockname, time_out)
return identifier
# 当锁未被设置过期时间时,重新设置其过期时间
elif self.redis_client.ttl(self.lockname) == -1:
self.redis_client.expire(self.lockname, time_out)
time.sleep(0.001)
return False
# 锁的释放
def release_lock(self):
pipe = self.redis_client.pipeline(True)
while True:
try:
# 通过watch命令监视某个键,当该键未被其他客户端修改值时,事务成功执行。当事务运行过程中,发现该值被其他客户端更新了值,任务失败
pipe.watch(self.lockname)
if pipe.get(self.lockname) == self.lockid: # 检查客户端是否仍然持有该锁
# multi命令用于开启一个事务,它总是返回ok
# multi执行之后, 客户端可以继续向服务器发送任意多条命令, 这些命令不会立即被执行, 而是被放到一个队列中, 当 EXEC 命令被调用时, 所有队列中的命令才会被执行
pipe.multi()
# 删除键,释放锁
pipe.delete(self.lockname)
# execute命令负责触发并执行事务中的所有命令
pipe.execute()
return True
pipe.unwatch()
break
except redis.exceptions.WatchError:
# # 释放锁期间,有其他客户端改变了键值对,锁释放失败,进行循环
pass
return False
引入lua确保原子性执行:
因为我们的上面的改造中release的执行分成了两步,但是还是会有问题?因为每个命令具有原子性,但是如果只是执行了某个命令呢?
redis本身没提供类似setnx这种的原子性操作API,所以最好还是需要依赖引入lua来执行确保原子性操作?python-redis-lock它包含有具体封装lua脚步执行的封装。这里我们的可以直接的使用,其实看看源码的话,挺有帮助的,里面一些内部的里面续租的实现机制!挺好的!这里我就只是简单的引入进行使用!
使用第三方库:
pip install python-redis-lock
这个库其实内置了很多我们的问题解决:
锁过期时间续租问题 上锁和解锁的原子性操作问题
引入之后的使用方式:
from peewee import *
# database = PostgresqlDatabase('zyxadminsystem', **{'host': 'localhost', 'port': 5432, 'user': 'postgres', 'password': '123456'})
from apps.modules.hanxuan_api_online.models.manage_pg_peewee import database, session_scope, session_scope_atomic
from peewee import *
from playhouse.shortcuts import model_to_dict, dict_to_model
# 自定义的
import json
class JSONField(TextField):
def db_value(self, value):
return json.dumps(value)
def python_value(self, value):
if value is not None:
return json.loads(value)
class UnknownField(object):
def __init__(self, *_, **__): pass
class BaseModel(Model):
class Meta:
database = database
class Stocks(BaseModel):
name = TextField(constraints=[SQL("DEFAULT ''::text")], index=True, null=True)
stokenum = IntegerField(null=True)
version = IntegerField(null=True, default=0)
class Meta:
table_name = 'stocks'
import threading
# 全局锁
globalthreading_lock = threading.Lock()
import redis
import time
import uuid
import redis_lock
def ordercreat(num):
while True:
client = redis.Redis(host='127.0.0.1', port=6379, password='123456')
# def __init__(self, redis_client, name, expire=None, id=None, auto_renewal=False, strict=True, signal_expire=1000):
# auto_renewal 是否开启续租,如果执行的逻辑操作了过期时间的设置,自动的进行续租
# expire 锁过去的额时间
rlock = redis_lock.Lock(client, "pingguoshouji-redis-lock", auto_renewal=True, expire=10)
if rlock.acquire():
with database.atomic():
# 执行上锁
# 设置redis连接
for paynum in range(3):
print("执行redis锁!!!!!:")
print("当前执行的线程是:", threading.current_thread().name)
result = Stocks.get_or_none(Stocks.name == '苹果手机')
if result and result.stokenum > 0:
print("开始i执行库存扣减之前,查询到的库存数是:", '线程名称:', threading.current_thread().name, "库存:", result.stokenum)
# 保证更新的原子性-,保证执行到底--只能执行一次!同时之后,在执行的时候,条件不成立!
_updatarresult = Stocks.update(stokenum=Stocks.stokenum - num) \
.where(Stocks.name == '苹果手机') \
.execute()
if _updatarresult:
result = Stocks.get_or_none(Stocks.name == '苹果手机')
print("更新成功!,扣减之后,查询到的库存数是:", '线程名称:', threading.current_thread().name, "库存:", result.stokenum)
else:
print("更新失败!!", threading.current_thread().name, "库存:", result.stokenum)
print("=======》:" * 5)
else:
print("库存不足")
database.rollback()
# 释放锁
rlock.release()
# 退出业务逻辑循环
break
else:
print("已经上锁了!!!!你需要等等!")
if __name__ == '__main__':
liststh = []
for i in range(15):
t = threading.Thread(name='name-t' + str(i), target=ordercreat, args=(10,))
t.start()
liststh.append(t)
for i in liststh:
i.join()
print("主线程结束!")
总结
以上仅仅是个人实践总结梳理,如有笔误!欢迎批评指正!感谢各位大佬!
结尾
END
简书:https://www.jianshu.com/u/d6960089b087
掘金:https://juejin.cn/user/2963939079225608
公众号:微信搜【小儿来一壶枸杞酒泡茶】
小钟同学 | 文 【欢迎一起学习交流】| QQ:308711822




