需求:要将2个Redis实例进行合并,需要考虑是否有重复值
示例示范:
time python3 scan_key_diff.py \
--host1 localhost01 --port1 6379 --password1 123456 \
--host2 localhost02 --port2 6379 --password2 123456 \
--output duplicates.txt --batch 4000 --threads 10 输出示范:
> --host1 localhost01 --port1 6379 --password1 123456 \
> --host2 localhost02 --port2 6379 --password2 123456 \
> --output duplicates.txt --batch 4000 --threads 10
已检查 20373137 个key,发现 0 个重复key
比较完成!
总共检查了 20373137 个key
发现 4 个重复key,已保存到文件: duplicates.txt
使用线程数: 10
real 2m4.613s
user 0m59.965s
sys 0m5.561s
$ cat duplicates.txt
# Redis Key比较结果 20250512_131931
# 源实例: localhost01:6379
# 目标实例: localhost02:6379
# 排除前缀: ['bb', 'cc']
# 线程数: 10
aa_00
k1
xx_00
bourne_00Python脚本代码:
import redis
from datetime import datetime
from concurrent.futures import ThreadPoolExecutor, as_completed
import threading
def compare_redis_keys(host1, port1, password1, host2, port2, password2, output_file,
batch_size=4000, thread_count=10):
# 连接池设置(每个线程使用独立连接)
pool1 = redis.ConnectionPool(host=host1, port=port1, password=password1, db=0)
pool2 = redis.ConnectionPool(host=host2, port=port2, password=password2, db=0)
# 排除的前缀列表
exclude_prefixes = ['aa', 'bb', 'cc']
# 统计变量(使用线程安全的方式)
total_checked = 0
duplicate_count = 0
counter_lock = threading.Lock()
# 准备输出文件
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
output_file = f"duplicate_keys_{timestamp}.txt" if output_file == "auto" else output_file
def check_key_batch(keys):
nonlocal duplicate_count, total_checked
local_duplicates = []
# 每个线程创建自己的Redis连接
r1 = redis.Redis(connection_pool=pool1)
r2 = redis.Redis(connection_pool=pool2)
# 检查这批key
for key in keys:
key_str = key.decode('utf-8')
if not any(key_str.startswith(prefix) for prefix in exclude_prefixes):
if r2.exists(key):
local_duplicates.append(key_str)
# 更新统计计数
with counter_lock:
total_checked += len(keys)
duplicate_count += len(local_duplicates)
print(f"已检查 {total_checked} 个key,发现 {duplicate_count} 个重复key", end='\r')
return local_duplicates
with open(output_file, 'w', encoding='utf-8') as f:
# 写入文件头
f.write(f"# Redis Key比较结果 {timestamp}\n")
f.write(f"# 源实例: {host1}:{port1}\n")
f.write(f"# 目标实例: {host2}:{port2}\n")
f.write(f"# 排除前缀: {exclude_prefixes}\n")
f.write(f"# 线程数: {thread_count}\n\n")
# 使用线程池并行处理
with ThreadPoolExecutor(max_workers=thread_count) as executor:
futures = []
r1_scan = redis.Redis(connection_pool=pool1)
cursor = '0'
while cursor != 0:
cursor, keys = r1_scan.scan(cursor=cursor, count=batch_size)
if keys:
# 提交批处理任务到线程池
future = executor.submit(check_key_batch, keys)
futures.append(future)
# 收集结果并写入文件
for future in as_completed(futures):
duplicates = future.result()
if duplicates:
f.write('\n'.join(duplicates) + '\n')
# 输出最终结果
print("\n\n比较完成!")
print(f"总共检查了 {total_checked} 个key")
print(f"发现 {duplicate_count} 个重复key,已保存到文件: {output_file}")
print(f"使用线程数: {thread_count}")
if __name__ == "__main__":
import argparse
# 设置命令行参数解析
parser = argparse.ArgumentParser(description='比较两个Redis实例的key重复情况')
parser.add_argument('--host1', default='localhost01', help='第一个Redis主机地址')
parser.add_argument('--port1', type=int, default=6379, help='第一个Redis端口')
parser.add_argument('--password1', default='123456', help='第一个Redis密码')
parser.add_argument('--host2', default='localhost02', help='第二个Redis主机地址')
parser.add_argument('--port2', type=int, default=6379, help='第二个Redis端口')
parser.add_argument('--password2', default='123456', help='第二个Redis密码')
parser.add_argument('--output', default='auto', help='输出文件路径(默认自动生成)')
parser.add_argument('--batch', type=int, default=4000, help='每批处理的key数量')
parser.add_argument('--threads', type=int, default=10, help='使用的线程数')
args = parser.parse_args()
compare_redis_keys(
host1=args.host1,
port1=args.port1,
password1=args.password1,
host2=args.host2,
port2=args.port2,
password2=args.password2,
output_file=args.output,
batch_size=args.batch,
thread_count=args.threads
)「喜欢这篇文章,您的关注和赞赏是给作者最好的鼓励」
关注作者
【版权声明】本文为墨天轮用户原创内容,转载时必须标注文章的来源(墨天轮),文章链接,文章作者等基本信息,否则作者和墨天轮有权追究责任。如果您发现墨天轮中有涉嫌抄袭或者侵权的内容,欢迎发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。




