有人在 Python 中需要一个作业队列:多个写入者以随机顺序插入其中,并将作业写入 MySQL 表jobs。从jobs表中,多个消费者分批n或更小(n=100)声明作业,并处理它们。处理后,消费者删除作业。我们需要并发的作业生成和消耗,以及适当和高效的锁定。
此示例的完整源代码可以在mysql-claim-jobs.py中的mysql-dev-examples中看到。
基础程序
使用我们通常的包含和设置,
from time import sleep from random import randint from sys import exit from multiprocessing import Process import click import MySQLdb import MySQLdb.cursors db_config = dict( host="127.0.0.1", user="kris", passwd="geheim", db="kris", cursorclass=MySQLdb.cursors.DictCursor, )
表jobs和设置
我们创建一个 MySQL 表jobs:
@click.group(help="Generate and consume jobs concurrently")
def sql():
pass
@sql.command(help="Recreate jobs table")
def setup_tables():
sql_setup = [
"drop table if exists jobs",
"""create table jobs (
id integer not null primary key auto_increment,
d varchar(64) not null,
e varchar(64) not null,
status enum("unclaimed", "claimed", "done") not null,
owner_id integer null,
owner_date datetime null,
index(d),
index(status)
)""",
"commit",
]
db = MySQLdb.connect(**db_config)
for cmd in sql_setup:
try:
c = db.cursor()
c.execute(cmd)
except MySQLdb.OperationalError as e:
click.echo(f"setup_tables: failed {e} with {cmd}.")
sql()
在我们的表中,字段d和e代表工作负载。我们稍后将根据d字段的内容选择要处理的作业,因此我们会在其上创建一个索引。
每个作业都有一个status(unclaimed或claimed, 用于调试done而不是被删除)、一个所有者和一个访问日期,当我们更改作业的声明状态时,我们会更新这些日期。当我们按状态访问作业时,我们也会对 进行索引status。
多处理
我们生成一组两个同时将作业插入到jobs表中的生成器,以及一组 10 个从jobs表中取出作业的消费者。
@sql.command(help="Run job creation and consumer jobs")
def start_processing():
proc_generator = []
for i in range(1, 3):
g = Process(target=generator, args=(i,))
proc_generator.append(g)
g.start()
proc_consumer = []
for i in range(1, 11):
c = Process(target=consumer, args=(i,))
proc_consumer.append(c)
c.start()
从multiprocessing,我们正在使用Process来处理这个。这些进程中的每一个都使用args=构造函数的命名参数获取标识号作为参数。我们稍后将在 中使用它consumer()来跟踪已声明作业的所有者。
发电机
我们的生成器将generator_id用作参数,但不使用它。
我们生成随机d和e字段,并使用自动递增的数字来识别单个作业。新作业的状态为unclaimed和owner_id且owner_date为NULL。为了速度,我们每十个工作提交一次工作表。
def generator(generator_id):
counter = 0
step = 10
cmd = "insert into jobs (id,d,e,status,owner_id,owner_date) values (NULL,%(d)s,%(e)s,'unclaimed',NULL,NULL)"
db = MySQLdb.connect(**db_config)
c = db.cursor()
while True:
data = {
"d": "".join([chr(randint(97, 97 + 25)) for _ in range(64)]),
"e": "".join([chr(randint(97, 97 + 25)) for _ in range(64)]),
}
c.execute(cmd, data)
counter += 1
if counter % step == 0:
sleep(0.1)
db.commit()
消费者
我们的消费者需要声明满足jobs表中条件的无人认领的工作。为简单起见,我们声明以随机字符开头的行,例如“a%”。
天真的方法使用 UPDATE
在 SQL 中,这可能是
UPDATE jobs SET
status = 'claimed',
owner_id = ?,
owner_date = now()
WHERE
status = 'unclaimed' AND
d LIKE 'a%'
但是如果我们这样做,我们会遇到两个问题:
- 我们没有得到声称的记录进行处理。我们需要在认领它们后选择它们 ( SELECT id, d, e FROM jobs WHERE status = ‘claimed’ AND owner_id = ?)。
- 该UPDATE语句产生 X 锁,并且这些一直持续到我们提交。其他尝试声明作业的进程在扫描表时可能会遇到我们的 X 锁。这些进程会挂起。
我们需要另一种方法来处理这个问题以使其有效。
使用智能方法 SELECT … FOR UPDATE SKIP LOCKED
从 MySQL 8 开始,我们SKIP LOCKED在运行 select 时得到这个选项。这在手册中记录如下:
使用 SKIP LOCKED 的锁定读取从不等待获取行锁。查询立即执行,从结果集中删除锁定的行。
使用FOR UPDATE我们可以读取我们想要的行并UPDATE用 X 锁标记它们以备后用。同时,我们忽略已经被其他进程标记为稍后更新的记录而不会停止。
这正是我们所需要的。为了仅标记我们打算声明为锁定的行,我们需要SELECT … .FOR UPDATE SKIP LOCKED通过主键访问语句中的行。也就是说,我们的语句需要具有形式SELECT … FROM jobs WHERE id IN ( 1, 2, 3 ) FOR UPDATE SKIP LOCKED。为了将我们的条件转换为一组主键,我们需要SELECT在锁定之前使用另一条语句SELECT。
所以这是计划:
- 运行查询以查找一组候选主键(100 个或更少)。此查询可以在我们的事务之外运行,并且可以使用副本来查找一组候选主键。
- 开始交易。这在 Python 中一直发生并且自动发生。
- 在我们的事务中,SELECT … FROM josb WHERE id IN (…) FOR UPDATE SKIP LOCKED按照讨论运行。
- 使用上SELECT一条语句的结果,运行UPDATE以声明作业。
- 运行COMMIT以删除 X 锁,并使更改对其他进程可见。
请注意,候选主键集的大小最多可以为 100 个 id,但也可以为空:我们正在搜索以随机选择的字母开头的无人认领的作业,并且可能没有。
请注意,声明的主键集可以与候选主键集相同或更小。它可能是空的,即使候选集不是:在这种情况下,所有候选者都在我们之前被其他消费者抢走了。我们可能想要计算并记录它,以检测我们在消费者中的并发性是否太高。
寻找候选人
在代码中:
def find_candidates(cursor, wanted):
candidate_cmd = f"select id from jobs where d like %(search_string)s and status = %(wanted)s limit 100"
search_string = chr(randint(97, 97 + 25)) + '%'
try:
cursor.execute(candidate_cmd, {"search_string": search_string, "wanted": wanted})
except MySQLdb.OperationalError as e:
click.echo(f"find_candidates: {e}")
exit(1)
candidate_ids = cursor.fetchall()
return [c["id"] for c in candidate_ids]
我们正在使用一些任意的 SQL 条件来查找记录。在我们的示例中,我们将WHERE d LIKE ‘a%’ AND status = ‘unclaimed’, 用于可变首字母和可变状态值。我们只对主键感兴趣,所以我们只返回这些,在list.
消费工作
实际consumer()看起来是这样的:
def consumer(consumer_id):
db = MySQLdb.connect(**db_config)
c = db.cursor()
while True:
# find potential records to process with the status 'unclaimed'
candidates = find_candidates(c, "unclaimed")
# we did not find any
if len(candidates) == 0:
# this is important, it will drop the persistent read view.
# not doing this means we will never see newly inserted records from the generator
db.commit()
continue
# with the list of candidate ids, we select them for update, skipping locked records
lock_cmd = f"select id, d, e from jobs where status = 'unclaimed' and id in %(candidates)s for update skip locked"
c.execute(lock_cmd, {"candidates": candidates})
claimed_records = c.fetchall() # these are the records we want to claim for processing
# make us a list of ids of the claimed records
claimed_ids = [ r["id"] for r in claimed_records]
claimed_ids_count = len(claimed_ids)
if len(claimed_ids) == 0:
continue
# we claim the records, updating their status and owner
claim_cmd = """update jobs
set status = 'claimed',
owner_date = now(),
owner_id = %(consumer_id)s
where id in %(claimed_ids)s"""
c.execute(claim_cmd, {"claimed_ids": claimed_ids, "consumer_id": consumer_id})
db.commit()
# doit(claimed_records) -- process the records, that takes some time
print(f"consumer {consumer_id: } #records = {claimed_ids_count} - {claimed_ids}")
sleep(0.1)
# after processing we delete them
done_cmd = "delete from jobs where id in %(claimed_ids)s"
c.execute(done_cmd, {"claimed_ids": claimed_ids})
db.commit()
也就是说,我们调用find_candidates()以生成候选 id 列表。如果我们没有发现任何,我们运行一个db.commit()我们之前continue。这相当重要:我们在这里处于事务中,并且REPEATABLE READ孤立地意味着我们获得了一致的读取视图。如果我们再试一次,只使用 only continue,我们将停留在同一个事务中,永远不会看到jobs表更新。
使用一组候选 id,我们然后运行SELECT如上所述的锁定。这有两件事:
- 它返回我们使用的数据 claimed_records = c.fetchall()
- 它还对行进行 X 锁定以永久声明它们
我们claimed_ids从生成一个列表claimed_records,并检查该列表是否为非空。如果它是空的,我们确实有一个候选 id 列表,但这些都没有通过。这意味着另一个并发进程从我们这里抢走了候选人——所有人。我们应该记录这个,但不在这个示例程序中。
然后UPDATE,我们运行将作业标记为已声明的实际值,并更新所有者和接管的日期时间。我们需要在db.commit()这里将此状态更改写入磁盘。这现在也对其他进程可见。
我们现在可以在闲暇时继续实际处理这些记录。这可能需要时间,我们稍等片刻即可模拟。
处理完成后,我们从jobs表中删除作业,并再次提交以使删除可见。
运行日志
这是一个简短的测试运行协议:
(venv) mysql-dev-examples/mysql-claim-jobs$ ./mysql-claim-jobs.py --help
Usage: mysql-claim-jobs.py [OPTIONS] COMMAND [ARGS]...
Generate and consume jobs concurrently
Options:
--help Show this message and exit.
Commands:
setup-tables Recreate jobs table
start-processing Run job creation and consumer jobs
(venv) mysql-dev-examples/mysql-claim-jobs$ ./mysql-claim-jobs.py setup-tables
(venv) mysql-dev-examples/mysql-claim-jobs$ ./mysql-claim-jobs.py start-processing
consumer 4 #records = 1 - [1]
consumer 3 #records = 1 - [5]
consumer 8 #records = 1 - [17]
consumer 6 #records = 1 - [13]
consumer 7 #records = 1 - [15]
consumer 10 #records = 1 - [11]
consumer 2 #records = 1 - [12]
consumer 1 #records = 1 - [9]
consumer 9 #records = 1 - [3]
consumer 5 #records = 1 - [7]
consumer 7 #records = 4 - [36, 59, 62, 109]
consumer 9 #records = 5 - [44, 85, 113, 119, 127]
consumer 8 #records = 3 - [72, 79, 126]
consumer 5 #records = 5 - [45, 63, 80, 94, 95]
consumer 10 #records = 1 - [51]
consumer 3 #records = 7 - [2, 14, 22, 53, 68, 120, 140]
consumer 1 #records = 4 - [23, 97, 115, 122]
consumer 2 #records = 6 - [4, 47, 65, 99, 103, 105]
consumer 6 #records = 6 - [26, 32, 43, 77, 83, 86]
consumer 4 #records = 3 - [48, 84, 130]
...
consumer 9 #records = 1 - [12413]
consumer 3 #records = 2 - [12407, 12445]
consumer 7 #records = 2 - [12450, 12459]
consumer 5 #records = 2 - [12446, 12470]
consumer 1 #records = 3 - [12397, 12426, 12441]
consumer 8 #records = 1 - [12458]
consumer 2 #records = 15 - [12282, 12313, 12327, 12350, 12365, 12369, 12385, 12399, 12401,\
12425, 12435, 12443, 12452, 12453, 12462]
consumer 4 #records = 3 - [12421, 12440, 12466]
^S
consumer 1 #records = 1 - [12515]
counter = 6000
counter = 6000
consumer 5 #records = 30 - [12525, 12532, 12575, 12592, 12673, 12676, 12680, 12714, 12719,
12732, 12771, 12804, 12882, 12961, 12977, 13010, 13024, 13060, 13068, 13156, 13204, 13214,
13249, 13279, 13280, 13284, 13299, 13307, 13358, 13396]
consumer 10 #records = 31 - [12516, 12531, 12537, 12541, 12623, 12630, 12638, 12742, 12777,
12791, 12792, 12800, 12810, 12819, 12821, 12981, 12990, 13117, 13119, 13131, 13138, 13161,
13180, 13192, 13202, 13259, 13283, 13352, 13367, 13383, 13414]
我们可以看到每个消费者如何抓取可变数量的作业进行处理,有时是单个作业,有时是更多。如果我们让事物运行一段时间,我们会看到 id 增加。当我们使用 Ctrl-S 停止输出时,我们也阻止了消费者。在恢复时,很多工作都在队列中徘徊,并且最初消费者在每次抓取时都声称有一个大的 id 列表。在队列中的作业数量也下降到正常水平后,这很快就会恢复到正常水平。
我们可以在数据库中检查队列长度和状态:
kris@localhost [kris]> select status, count(status) from jobs group by status;
+-----------+---------------+
| status | count(status) |
+-----------+---------------+
| unclaimed | 47 |
| claimed | 23 |
+-----------+---------------+
2 rows in set (0.00 sec)
概括
我们使用并发 Python 程序在数据库中创建了一个作业队列。为了有效地选择作业,我们将任意查询简化为一组候选主键。然后我们使用 aSELECT … FROM jobs WHERE id IN ( … ) FOR UPDATE SKIP LOCKED在候选记录上创建一组 X 锁FOR UPDATE。同时,我们使用SKIP LOCKEDMySQL 8 中的新功能避免等待其他线程的 X 锁。然后我们使用 获取的数据SELECT FOR UPDATE为这些记录提交永久所有权,并使用这些数据进行处理。处理后,我们通过从表中删除作业来进行清理。
jobs如果需要,可以通过对表进行分区来进一步扩展该方法。




