在任务分发系统设计(一)中我们实现了一个基本任务分发系统:通过调用 recreate_task_pool 方法实现创建任务池;调用 assign_tasks 方法分配任务;最后调用 tasks_done 方法完成任务。这三个方法使用过程中需要小心的一点是数据一致性问题,因为它们都是在处理同一份数据。至于创建任务池的时间可以是在线实时生成的,也可以是离线定时生成,只要不丢失数据即可。
为了防止不同方法之间的干扰,我们需要实现一个分布式锁来解决这个问题。下面我们就用 Redis 来实现一个简易的分布式锁。
class FunctionLock(object):TIMEOUT = 60 # 锁过期超时@classmethoddef gen_name(cls, key):return "{prefix}:{key}".format(prefix=cls.__name__, key=key)@classmethoddef get_lock(cls, name, timeout=0):return cod.set(name, "1", ex=timeout or cls.TIMEOUT, nx=True)@classmethoddef acquire(cls, key, timeout=0):name = cls.gen_name(key)return cls.get_lock(name, timeout=timeout)@classmethoddef release(cls, key):name = cls.gen_name(key)cod.delete(name)def synchronized(prefix, local=0, timeout=60):def decorator(func):@functools.wraps(func)def wrapper(*args, **kwargs):key = prefix + str(args[local])if not FunctionLock.acquire(key, timeout=timeout):raise RuntimeError("PROCESSING LOCKED")try:return func(*args, **kwargs)finally:FunctionLock.release(id)return wrapperreturn decorator
简单介绍一下实现原理。借用 Redis 的 set 方法,加入了 nx(not exist)参数,可以保证如果已有 key 存在,则函数不会调用成功,也就是只有一个客户端能持有锁,满足互斥性。其次,由于我们对锁设置了 ex(expire)过期时间,即使锁的持有者后续发生崩溃而没有解锁,锁也会因为到了过期时间而自动解锁(即 key 被删除),不会发生死锁。
使用的时候非常方便。
class TaskDispenser(object):key = "_dispenser_"def __str__(self):return self.pool_keydef __init__(self, pool_key, router_key, bucket_prefix):self.pool_key = pool_keyself.task_pool = TaskPool(pool_key)self.task_router = TaskRouter(router_key, bucket_prefix)@synchronized(key)def recreate_task_pool(self, mapping):"""构建任务池:param mapping: dict, {value: score}"""buckets = self.task_router.buckets()# 收集已经分配的任务assigned_flow_ids = []for name in buckets:tb = TaskBucket(name)assigned_flow_ids.extend(tb.peek_all())# 移除已经分配的任务for fid in assigned_flow_ids:mapping.pop(fid, None)# 重构任务池self.task_pool.clear()self.task_pool.add_many(mapping)@synchronized(key)def assign_tasks(self, limit, worker):"""任务分配:param limit: int, 任务数量:param worker: str, 任务处理者:return: list[int]"""bucket_name = self.task_router.get(worker)if not bucket_name:bucket_name = self.task_router.add(worker)tb = TaskBucket(bucket_name)tasks = tb.peek_all()size = len(tasks)if size == limit:return tasks# 分配任务时需要先检查自己的桶里是否有未处理的任务# 并且确认数量是否满足要求if size < limit:# 少补new_tasks = self.task_pool.assign(limit-size)if new_tasks:tb.add(new_tasks)tasks.extend(new_tasks)else:# 多不退,通过下次重建任务池的方式重新分配for _ in range(size-limit):tb.pop(reverse=True)tasks = tb.peek_all()return tasksdef check_tasks(self, worker, tasks):"""校验任务:param worker: str, 任务处理者:param tasks: list, 需要校验的任务:return: bool"""bucket_name = self.task_router.get(worker)if not bucket_name:return Falsetb = TaskBucket(bucket_name)bucket_tasks = tb.peek_all()return set(bucket_tasks) == set(tasks)@synchronized(key)def tasks_done(self, worker, tasks):"""任务完成"""bucket_name = self.task_router.get(worker)if not bucket_name:return Falsetb = TaskBucket(bucket_name)tb.clear()return True
通过装饰器的方式把以上三个方法全部加上同步处理,使得这三个方法可以在并发的情况下顺序执行,从而保证数据一致性。
文章转载自剽悍的派森先生,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。




