暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

[译文] MySQL:Python 中的作业队列

原创 KRISTIAN KÖHNTOPP 2021-08-16
275

有人在 Python 中需要一个作业队列:多个写入者以随机顺序插入其中,并将作业写入 MySQL 表jobs。从jobs表中,多个消费者分批n或更小(n=100)声明作业,并处理它们。处理后,消费者删除作业。我们需要并发的作业生成和消耗,以及适当和高效的锁定。

此示例的完整源代码可以在mysql-claim-jobs.py中的mysql-dev-examples中看到。

基础程序

使用我们通常的包含和设置,

from time import sleep from random import randint from sys import exit from multiprocessing import Process import click import MySQLdb import MySQLdb.cursors db_config = dict( host="127.0.0.1", user="kris", passwd="geheim", db="kris", cursorclass=MySQLdb.cursors.DictCursor, )

表jobs和设置

我们创建一个 MySQL 表jobs:

@click.group(help="Generate and consume jobs concurrently") def sql(): pass @sql.command(help="Recreate jobs table") def setup_tables(): sql_setup = [ "drop table if exists jobs", """create table jobs ( id integer not null primary key auto_increment, d varchar(64) not null, e varchar(64) not null, status enum("unclaimed", "claimed", "done") not null, owner_id integer null, owner_date datetime null, index(d), index(status) )""", "commit", ] db = MySQLdb.connect(**db_config) for cmd in sql_setup: try: c = db.cursor() c.execute(cmd) except MySQLdb.OperationalError as e: click.echo(f"setup_tables: failed {e} with {cmd}.") sql()

在我们的表中,字段d和e代表工作负载。我们稍后将根据d字段的内容选择要处理的作业,因此我们会在其上创建一个索引。

每个作业都有一个status(unclaimed或claimed, 用于调试done而不是被删除)、一个所有者和一个访问日期,当我们更改作业的声明状态时,我们会更新这些日期。当我们按状态访问作业时,我们也会对 进行索引status。

多处理

我们生成一组两个同时将作业插入到jobs表中的生成器,以及一组 10 个从jobs表中取出作业的消费者。

@sql.command(help="Run job creation and consumer jobs") def start_processing(): proc_generator = [] for i in range(1, 3): g = Process(target=generator, args=(i,)) proc_generator.append(g) g.start() proc_consumer = [] for i in range(1, 11): c = Process(target=consumer, args=(i,)) proc_consumer.append(c) c.start()

从multiprocessing,我们正在使用Process来处理这个。这些进程中的每一个都使用args=构造函数的命名参数获取标识号作为参数。我们稍后将在 中使用它consumer()来跟踪已声明作业的所有者。

发电机

我们的生成器将generator_id用作参数,但不使用它。

我们生成随机d和e字段,并使用自动递增的数字来识别单个作业。新作业的状态为unclaimed和owner_id且owner_date为NULL。为了速度,我们每十个工作提交一次工作表。

def generator(generator_id): counter = 0 step = 10 cmd = "insert into jobs (id,d,e,status,owner_id,owner_date) values (NULL,%(d)s,%(e)s,'unclaimed',NULL,NULL)" db = MySQLdb.connect(**db_config) c = db.cursor() while True: data = { "d": "".join([chr(randint(97, 97 + 25)) for _ in range(64)]), "e": "".join([chr(randint(97, 97 + 25)) for _ in range(64)]), } c.execute(cmd, data) counter += 1 if counter % step == 0: sleep(0.1) db.commit()

消费者

我们的消费者需要声明满足jobs表中条件的无人认领的工作。为简单起见,我们声明以随机字符开头的行,例如“a%”。

天真的方法使用 UPDATE

在 SQL 中,这可能是

UPDATE jobs SET status = 'claimed', owner_id = ?, owner_date = now() WHERE status = 'unclaimed' AND d LIKE 'a%'

但是如果我们这样做,我们会遇到两个问题:

  • 我们没有得到声称的记录进行处理。我们需要在认领它们后选择它们 ( SELECT id, d, e FROM jobs WHERE status = ‘claimed’ AND owner_id = ?)。
  • 该UPDATE语句产生 X 锁,并且这些一直持续到我们提交。其他尝试声明作业的进程在扫描表时可能会遇到我们的 X 锁。这些进程会挂起。

我们需要另一种方法来处理这个问题以使其有效。

使用智能方法 SELECT … FOR UPDATE SKIP LOCKED

从 MySQL 8 开始,我们SKIP LOCKED在运行 select 时得到这个选项。这在手册中记录如下:

使用 SKIP LOCKED 的锁定读取从不等待获取行锁。查询立即执行,从结果集中删除锁定的行。

使用FOR UPDATE我们可以读取我们想要的行并UPDATE用 X 锁标记它们以备后用。同时,我们忽略已经被其他进程标记为稍后更新的记录而不会停止。

这正是我们所需要的。为了仅标记我们打算声明为锁定的行,我们需要SELECT … .FOR UPDATE SKIP LOCKED通过主键访问语句中的行。也就是说,我们的语句需要具有形式SELECT … FROM jobs WHERE id IN ( 1, 2, 3 ) FOR UPDATE SKIP LOCKED。为了将我们的条件转换为一组主键,我们需要SELECT在锁定之前使用另一条语句SELECT。

所以这是计划:

  1. 运行查询以查找一组候选主键(100 个或更少)。此查询可以在我们的事务之外运行,并且可以使用副本来查找一组候选主键。
  2. 开始交易。这在 Python 中一直发生并且自动发生。
  • 在我们的事务中,SELECT … FROM josb WHERE id IN (…) FOR UPDATE SKIP LOCKED按照讨论运行。
  • 使用上SELECT一条语句的结果,运行UPDATE以声明作业。
  • 运行COMMIT以删除 X 锁,并使更改对其他进程可见。

请注意,候选主键集的大小最多可以为 100 个 id,但也可以为空:我们正在搜索以随机选择的字母开头的无人认领的作业,并且可能没有。

请注意,声明的主键集可以与候选主键集相同或更小。它可能是空的,即使候选集不是:在这种情况下,所有候选者都在我们之前被其他消费者抢走了。我们可能想要计算并记录它,以检测我们在消费者中的并发性是否太高。

寻找候选人

在代码中:

def find_candidates(cursor, wanted): candidate_cmd = f"select id from jobs where d like %(search_string)s and status = %(wanted)s limit 100" search_string = chr(randint(97, 97 + 25)) + '%' try: cursor.execute(candidate_cmd, {"search_string": search_string, "wanted": wanted}) except MySQLdb.OperationalError as e: click.echo(f"find_candidates: {e}") exit(1) candidate_ids = cursor.fetchall() return [c["id"] for c in candidate_ids]

我们正在使用一些任意的 SQL 条件来查找记录。在我们的示例中,我们将WHERE d LIKE ‘a%’ AND status = ‘unclaimed’, 用于可变首字母和可变状态值。我们只对主键感兴趣,所以我们只返回这些,在list.

消费工作

实际consumer()看起来是这样的:

def consumer(consumer_id): db = MySQLdb.connect(**db_config) c = db.cursor() while True: # find potential records to process with the status 'unclaimed' candidates = find_candidates(c, "unclaimed") # we did not find any if len(candidates) == 0: # this is important, it will drop the persistent read view. # not doing this means we will never see newly inserted records from the generator db.commit() continue # with the list of candidate ids, we select them for update, skipping locked records lock_cmd = f"select id, d, e from jobs where status = 'unclaimed' and id in %(candidates)s for update skip locked" c.execute(lock_cmd, {"candidates": candidates}) claimed_records = c.fetchall() # these are the records we want to claim for processing # make us a list of ids of the claimed records claimed_ids = [ r["id"] for r in claimed_records] claimed_ids_count = len(claimed_ids) if len(claimed_ids) == 0: continue # we claim the records, updating their status and owner claim_cmd = """update jobs set status = 'claimed', owner_date = now(), owner_id = %(consumer_id)s where id in %(claimed_ids)s""" c.execute(claim_cmd, {"claimed_ids": claimed_ids, "consumer_id": consumer_id}) db.commit() # doit(claimed_records) -- process the records, that takes some time print(f"consumer {consumer_id: } #records = {claimed_ids_count} - {claimed_ids}") sleep(0.1) # after processing we delete them done_cmd = "delete from jobs where id in %(claimed_ids)s" c.execute(done_cmd, {"claimed_ids": claimed_ids}) db.commit()

也就是说,我们调用find_candidates()以生成候选 id 列表。如果我们没有发现任何,我们运行一个db.commit()我们之前continue。这相当重要:我们在这里处于事务中,并且REPEATABLE READ孤立地意味着我们获得了一致的读取视图。如果我们再试一次,只使用 only continue,我们将停留在同一个事务中,永远不会看到jobs表更新。

使用一组候选 id,我们然后运行SELECT如上所述的锁定。这有两件事:

  • 它返回我们使用的数据 claimed_records = c.fetchall()
  • 它还对行进行 X 锁定以永久声明它们

我们claimed_ids从生成一个列表claimed_records,并检查该列表是否为非空。如果它是空的,我们确实有一个候选 id 列表,但这些都没有通过。这意味着另一个并发进程从我们这里抢走了候选人——所有人。我们应该记录这个,但不在这个示例程序中。

然后UPDATE,我们运行将作业标记为已声明的实际值,并更新所有者和接管的日期时间。我们需要在db.commit()这里将此状态更改写入磁盘。这现在也对其他进程可见。

我们现在可以在闲暇时继续实际处理这些记录。这可能需要时间,我们稍等片刻即可模拟。

处理完成后,我们从jobs表中删除作业,并再次提交以使删除可见。

运行日志

这是一个简短的测试运行协议:

(venv) mysql-dev-examples/mysql-claim-jobs$ ./mysql-claim-jobs.py --help Usage: mysql-claim-jobs.py [OPTIONS] COMMAND [ARGS]... Generate and consume jobs concurrently Options: --help Show this message and exit. Commands: setup-tables Recreate jobs table start-processing Run job creation and consumer jobs (venv) mysql-dev-examples/mysql-claim-jobs$ ./mysql-claim-jobs.py setup-tables (venv) mysql-dev-examples/mysql-claim-jobs$ ./mysql-claim-jobs.py start-processing consumer 4 #records = 1 - [1] consumer 3 #records = 1 - [5] consumer 8 #records = 1 - [17] consumer 6 #records = 1 - [13] consumer 7 #records = 1 - [15] consumer 10 #records = 1 - [11] consumer 2 #records = 1 - [12] consumer 1 #records = 1 - [9] consumer 9 #records = 1 - [3] consumer 5 #records = 1 - [7] consumer 7 #records = 4 - [36, 59, 62, 109] consumer 9 #records = 5 - [44, 85, 113, 119, 127] consumer 8 #records = 3 - [72, 79, 126] consumer 5 #records = 5 - [45, 63, 80, 94, 95] consumer 10 #records = 1 - [51] consumer 3 #records = 7 - [2, 14, 22, 53, 68, 120, 140] consumer 1 #records = 4 - [23, 97, 115, 122] consumer 2 #records = 6 - [4, 47, 65, 99, 103, 105] consumer 6 #records = 6 - [26, 32, 43, 77, 83, 86] consumer 4 #records = 3 - [48, 84, 130] ... consumer 9 #records = 1 - [12413] consumer 3 #records = 2 - [12407, 12445] consumer 7 #records = 2 - [12450, 12459] consumer 5 #records = 2 - [12446, 12470] consumer 1 #records = 3 - [12397, 12426, 12441] consumer 8 #records = 1 - [12458] consumer 2 #records = 15 - [12282, 12313, 12327, 12350, 12365, 12369, 12385, 12399, 12401,\ 12425, 12435, 12443, 12452, 12453, 12462] consumer 4 #records = 3 - [12421, 12440, 12466] ^S consumer 1 #records = 1 - [12515] counter = 6000 counter = 6000 consumer 5 #records = 30 - [12525, 12532, 12575, 12592, 12673, 12676, 12680, 12714, 12719, 12732, 12771, 12804, 12882, 12961, 12977, 13010, 13024, 13060, 13068, 13156, 13204, 13214, 13249, 13279, 13280, 13284, 13299, 13307, 13358, 13396] consumer 10 #records = 31 - [12516, 12531, 12537, 12541, 12623, 12630, 12638, 12742, 12777, 12791, 12792, 12800, 12810, 12819, 12821, 12981, 12990, 13117, 13119, 13131, 13138, 13161, 13180, 13192, 13202, 13259, 13283, 13352, 13367, 13383, 13414]

我们可以看到每个消费者如何抓取可变数量的作业进行处理,有时是单个作业,有时是更多。如果我们让事物运行一段时间,我们会看到 id 增加。当我们使用 Ctrl-S 停止输出时,我们也阻止了消费者。在恢复时,很多工作都在队列中徘徊,并且最初消费者在每次抓取时都声称有一个大的 id 列表。在队列中的作业数量也下降到正常水平后,这很快就会恢复到正常水平。

我们可以在数据库中检查队列长度和状态:

kris@localhost [kris]> select status, count(status) from jobs group by status; +-----------+---------------+ | status | count(status) | +-----------+---------------+ | unclaimed | 47 | | claimed | 23 | +-----------+---------------+ 2 rows in set (0.00 sec)

概括

我们使用并发 Python 程序在数据库中创建了一个作业队列。为了有效地选择作业,我们将任意查询简化为一组候选主键。然后我们使用 aSELECT … FROM jobs WHERE id IN ( … ) FOR UPDATE SKIP LOCKED在候选记录上创建一组 X 锁FOR UPDATE。同时,我们使用SKIP LOCKEDMySQL 8 中的新功能避免等待其他线程的 X 锁。然后我们使用 获取的数据SELECT FOR UPDATE为这些记录提交永久所有权,并使用这些数据进行处理。处理后,我们通过从表中删除作业来进行清理。

jobs如果需要,可以通过对表进行分区来进一步扩展该方法。

「喜欢这篇文章,您的关注和赞赏是给作者最好的鼓励」
关注作者
【版权声明】本文为墨天轮用户原创内容,转载时必须标注文章的来源(墨天轮),文章链接,文章作者等基本信息,否则作者和墨天轮有权追究责任。如果您发现墨天轮中有涉嫌抄袭或者侵权的内容,欢迎发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论