场景:在一个系统中有持续不断的任务产生,并且需要尽快处理。
解析:由于任务量巨大,对于下游处理者就有很多要求:
分配的任务不能重复;
处理者可以动态扩缩容;
任务处理超时重新分配。
方案:一个任务分发系统可以看成一个复杂一点的秒杀系统。拆解来看,主要有三部分组成:任务分发、延时队列以及分布式锁。
本文主要介绍任务分发模块。
通常来说,对于任务处理都会有一定的优先级规则,比如时间维度。我们知道 Redis 中的有序集合 zset 是可以根据 score 进行排序。我们刚好可以借助 zset 来实现一个任务池。
def dict_slice(mapping, start, end):"""切分字典:param mapping: 初始字典:param start: 其实位置,包含:param end: 结束位置,不包含:return: dict"""keys = mapping.keys()return {k: mapping[k]for k in keys[start:end]}class TaskPool(object):"""任务池"""BIG_TASK_SIZE = 500def __init__(self, key):self.key = keydef add(self, task, priority):"""添加一个任务到任务池:param task: str/int, 任务名称:param priority: float/int, 任务优先级"""client.zadd(self.key, {str(task): priority})def clear(self):return client.delete(self.key)def add_many(self, mapping):"""一次添加多个任务:param mapping: dict, {task1: priority1, task2: priority2}"""length = len(mapping)if length > self.BIG_TASK_SIZE:for start in range(0, length, self.BIG_TASK_SIZE):piece = dict_slice(mapping, start, start+self.BIG_TASK_SIZE)client.zadd(self.key, piece)else:client.zadd(self.key, mapping)def assign(self, size=20, factory=int):"""分配任务,默认一次分配 20 个:param size: int, 任务数量:param factory: callable, 格式化:return: list, ['4', '3', '2']"""pipe = client.pipeline()pipe.zrange(self.key, 0, size - 1)pipe.zremrangebyrank(self.key, 0, size - 1)tasks = pipe.execute()[0]return map(factory, tasks)
当有了一个任务池之后,我们就可以每次从这个任务池中申请一定数量的任务进行处理。
class TaskBucket(object):"""分配到个人的任务桶"""def __init__(self, name, exp):self.name = nameself.exp = expdef add(self, tasks):"""添加任务:param tasks: list, [1,2,3]"""if not tasks:returnclient.rpush(self.name, *tasks)self.expire(self.exp)def expire(self, time):"""设置过期时间"""return client.expire(self.name, time)def clear(self):"""清空"""return client.delete(self.name)def pop(self, reverse=False):"""从自己的列表中获取一个任务"""if not client.exists(self.name):returnself.expire(self.exp)if reverse:return client.rpop(self.name)return client.lpop(self.name)def peek_all(self, factory=int):"""查看所有任务:param factory: callable,类型转换方法:return: list[factory(val)]"""if not client.exists(self.name) or client.ttl(self.name) <= 0:logging.info('key {} not exists or ttl <= 0'.format(self.name))return []return map(factory, client.lrange(self.name, 0, -1))
我们还需要一个中间关联者,关联起池子跟桶的关系。
class TaskRouter(object):"""任务路由器"""def __init__(self, key, prefix):self.key = keyself.prefix = prefixdef make_task_name(self, val):return self.prefix.format(val)def add(self, name, task_name=None):"""添加处理人:param name: str:param task_name: str:return: str, 任务桶名称"""if task_name is None:task_name = self.make_task_name(name)client.hset(self.key, name, task_name)return task_namedef clear(self):"""清空"""return client.delete(self.key)def get(self, name):"""根据名称获取任务桶名称:param name: str, 处理人名称:return: str/None, 任务桶名称"""return client.hget(self.key, name)def buckets(self):"""查看所有的任务桶"""return client.hvals(self.key)
当我们有了任务池 TaskPool、任务桶 TaskBucket 以及任务路由器 TaskRouter,我们就可以实现最终的任务分发器。
class TaskDispenser(object):def __init__(self, pool_key, router_key, bucket_prefix):self.task_pool = TaskPool(pool_key)self.task_router = TaskRouter(router_key, bucket_prefix)def recreate_task_pool(self, mapping):"""构建任务池:param mapping: dict, {value: score}"""buckets = self.task_router.buckets()# 收集已经分配的任务assigned_flow_ids = []for name in buckets:tb = TaskBucket(name)assigned_flow_ids.extend(tb.peek_all())# 移除已经分配的任务for fid in assigned_flow_ids:mapping.pop(fid, None)# 重构任务池self.task_pool.clear()self.task_pool.add_many(mapping)def assign_tasks(self, limit, worker):"""任务分配:param limit: int, 任务数量:param worker: str, 任务处理者:return: list[int]"""bucket_name = self.task_router.get(worker)if not bucket_name:bucket_name = self.task_router.add(worker)tb = TaskBucket(bucket_name)tasks = tb.peek_all()size = len(tasks)if size == limit:return tasks# 分配任务时需要先检查自己的桶里是否有未处理的任务# 并且确认数量是否满足要求if size < limit:# 少补new_tasks = self.task_pool.assign(limit-size)if new_tasks:tb.add(new_tasks)tasks.extend(new_tasks)else:# 多不退,通过下次重建任务池的方式重新分配for _ in range(size-limit):tb.pop(reverse=True)tasks = tb.peek_all()return tasksdef check_tasks(self, worker, tasks):"""校验任务:param worker: str, 任务处理者:param tasks: list, 需要校验的任务:return: bool"""bucket_name = self.task_router.get(worker)if not bucket_name:return Falsetb = TaskBucket(bucket_name)bucket_tasks = tb.peek_all()return set(bucket_tasks) == set(tasks)def tasks_done(self, worker, tasks):"""任务完成"""bucket_name = self.task_router.get(worker)if not bucket_name:return Falsetb = TaskBucket(bucket_name)tb.clear()return True
我们通过调用 recreate_task_pool 方法实现创建任务池,调用 assign_tasks 方法分配任务,最后调用 tasks_done 方法完成任务。
整个过程看起来似乎没有什么问题。但是试想一下,在并发场景下会不会有问题呢。由于 recreate_task_pool、assign_tasks 以及 tasks_done 方法同时在操作相同的数据源,是必然会有并发问题的,也就是数据不一致问题。
那么该如何解决呢,且听下回分解。
文章转载自剽悍的派森先生,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。




