暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

Redis scan key 取重复值

原创 陌殇流苏 2025-05-12
146

需求:要将2个Redis实例进行合并,需要考虑是否有重复值


示例示范:

 time python3 scan_key_diff.py \
  --host1 localhost01 --port1 6379 --password1 123456 \
  --host2 localhost02 --port2 6379 --password2 123456 \
  --output duplicates.txt --batch 4000 --threads 10 


输出示范:

>   --host1 localhost01 --port1 6379 --password1 123456 \
>   --host2 localhost02 --port2 6379 --password2 123456 \
>   --output duplicates.txt --batch 4000 --threads 10 
已检查 20373137 个key,发现 0 个重复key

比较完成!
总共检查了 20373137 个key
发现 4 个重复key,已保存到文件: duplicates.txt
使用线程数: 10

real    2m4.613s
user    0m59.965s
sys     0m5.561s

$ cat duplicates.txt
# Redis Key比较结果 20250512_131931
# 源实例: localhost01:6379
# 目标实例: localhost02:6379
# 排除前缀: ['bb', 'cc']
# 线程数: 10

aa_00
k1
xx_00
bourne_00


Python脚本代码:

import redis
from datetime import datetime
from concurrent.futures import ThreadPoolExecutor, as_completed
import threading

def compare_redis_keys(host1, port1, password1, host2, port2, password2, output_file, 
                      batch_size=4000, thread_count=10):
    # 连接池设置(每个线程使用独立连接)
    pool1 = redis.ConnectionPool(host=host1, port=port1, password=password1, db=0)
    pool2 = redis.ConnectionPool(host=host2, port=port2, password=password2, db=0)
    
    # 排除的前缀列表
    exclude_prefixes = ['aa', 'bb', 'cc']
    
    # 统计变量(使用线程安全的方式)
    total_checked = 0
    duplicate_count = 0
    counter_lock = threading.Lock()
    
    # 准备输出文件
    timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
    output_file = f"duplicate_keys_{timestamp}.txt" if output_file == "auto" else output_file
    
    def check_key_batch(keys):
        nonlocal duplicate_count, total_checked
        local_duplicates = []
        
        # 每个线程创建自己的Redis连接
        r1 = redis.Redis(connection_pool=pool1)
        r2 = redis.Redis(connection_pool=pool2)
        
        # 检查这批key
        for key in keys:
            key_str = key.decode('utf-8')
            if not any(key_str.startswith(prefix) for prefix in exclude_prefixes):
                if r2.exists(key):
                    local_duplicates.append(key_str)
        
        # 更新统计计数
        with counter_lock:
            total_checked += len(keys)
            duplicate_count += len(local_duplicates)
            print(f"已检查 {total_checked} 个key,发现 {duplicate_count} 个重复key", end='\r')
        
        return local_duplicates

    with open(output_file, 'w', encoding='utf-8') as f:
        # 写入文件头
        f.write(f"# Redis Key比较结果 {timestamp}\n")
        f.write(f"# 源实例: {host1}:{port1}\n")
        f.write(f"# 目标实例: {host2}:{port2}\n")
        f.write(f"# 排除前缀: {exclude_prefixes}\n")
        f.write(f"# 线程数: {thread_count}\n\n")
        
        # 使用线程池并行处理
        with ThreadPoolExecutor(max_workers=thread_count) as executor:
            futures = []
            r1_scan = redis.Redis(connection_pool=pool1)
            cursor = '0'
            
            while cursor != 0:
                cursor, keys = r1_scan.scan(cursor=cursor, count=batch_size)
                if keys:
                    # 提交批处理任务到线程池
                    future = executor.submit(check_key_batch, keys)
                    futures.append(future)
            
            # 收集结果并写入文件
            for future in as_completed(futures):
                duplicates = future.result()
                if duplicates:
                    f.write('\n'.join(duplicates) + '\n')
    
    # 输出最终结果
    print("\n\n比较完成!")
    print(f"总共检查了 {total_checked} 个key")
    print(f"发现 {duplicate_count} 个重复key,已保存到文件: {output_file}")
    print(f"使用线程数: {thread_count}")

if __name__ == "__main__":
    import argparse
    
    # 设置命令行参数解析
    parser = argparse.ArgumentParser(description='比较两个Redis实例的key重复情况')
    parser.add_argument('--host1', default='localhost01', help='第一个Redis主机地址')
    parser.add_argument('--port1', type=int, default=6379, help='第一个Redis端口')
    parser.add_argument('--password1', default='123456', help='第一个Redis密码')
    parser.add_argument('--host2', default='localhost02', help='第二个Redis主机地址')
    parser.add_argument('--port2', type=int, default=6379, help='第二个Redis端口')
    parser.add_argument('--password2', default='123456', help='第二个Redis密码')
    parser.add_argument('--output', default='auto', help='输出文件路径(默认自动生成)')
    parser.add_argument('--batch', type=int, default=4000, help='每批处理的key数量')
    parser.add_argument('--threads', type=int, default=10, help='使用的线程数')
    
    args = parser.parse_args()
    
    compare_redis_keys(
        host1=args.host1,
        port1=args.port1,
        password1=args.password1,
        host2=args.host2,
        port2=args.port2,
        password2=args.password2,
        output_file=args.output,
        batch_size=args.batch,
        thread_count=args.threads
    )
「喜欢这篇文章,您的关注和赞赏是给作者最好的鼓励」
关注作者
【版权声明】本文为墨天轮用户原创内容,转载时必须标注文章的来源(墨天轮),文章链接,文章作者等基本信息,否则作者和墨天轮有权追究责任。如果您发现墨天轮中有涉嫌抄袭或者侵权的内容,欢迎发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论