书接上文:《记录一次MySQL一千六百万大表数据使用CDC同步到doris的坎坷过程》,这次开发同事居然反馈java写的定时同步任务,还是会偶尔丢失一部分数据,而且丢失的数据毫无规律可循。这是什么神一样的同步逻辑?
具体java逻辑我们就不去深究了。接下来是我给他们写的一份Linux shell脚本实现数据增量同步的功能。
实现 MySQL 大表定时同步到 Doris 的 Shell 脚本,需要考虑几个关键点:高效性、增量同步、容错性、定时调度。直接全量同步大表效率低且不实用,通常采用增量同步(如基于时间戳或自增ID)。
以下是一个完整的解决方案,包含 Shell 脚本和必要的配置说明:
核心思路
- 增量同步:记录上次同步的最大时间戳或ID,下次只同步该值之后的数据。
- 分批处理:大表数据分批次读取,避免内存溢出和长时间锁表。
- 数据导出:使用 mysqldump 或 SELECT ... INTO OUTFILE 将增量数据导出为 CSV 文件。
- 数据导入:使用 Doris 的 Stream Load API 将 CSV 文件导入。
- 状态管理:记录同步状态(如最后同步的ID或时间戳)到本地文件或数据库。
- 错误处理:检查每一步的执行结果,失败时进行重试或告警。
- 定时调度:使用 crontab 定时执行脚本。
Shell 脚本 (sync_mysql_to_doris.sh):
#!/bin/bash
# =============================================
#@File : sync_mysql_to_doris.sh
#@Title : MySQL → Doris 增量同步脚本
#@Time : 2025/08/23 16:11:00
#@Author : Beauty Bear
#@Version : 1.0.0
#@Desc : 适用于大表,基于时间戳或自增ID增量同步
# =============================================
# -------------------------------
# 1. 配置与常量 (请根据实际修改)
# -------------------------------
# MySQL 配置
MYSQL_HOST="地址"
MYSQL_PORT="端口"
MYSQL_USER="账号"
MYSQL_PASS="密码"
MYSQL_DB="库名"
MYSQL_TABLE="表名"
# 增量列配置
INCREMENTAL_COLUMN="id" # 增量列名,用于增量同步的列 (时间戳列或自增ID列)
INCREMENTAL_TYPE="bigint" # 类型,如: int, bigint, datetime, timestamp
# Doris 配置
DORIS_HOST="地址"
DORIS_HTTP_PORT="8030" # FE 的 HTTP 端口
DORIS_USER="账号"
DORIS_PASS="密码"
DORIS_DB="库名"
DORIS_TABLE="表名"
DORIS_COLUMNS_SEPARATOR="\t" # Doris 表字段分隔符
STREAM_LOAD_TIMEOUT="600" # 秒
# 同步配置
BATCH_SIZE=100000
MAX_RETRY=3
SLEEP_BETWEEN_BATCHES=1
# 路径配置
BASE_DIR="/data/sync"
LOG_FILE="$BASE_DIR/sync.log"
TEMP_DIR="$BASE_DIR/temp"
CSV_FILE_PREFIX="$TEMP_DIR/export"
CHECKPOINT_FILE="$BASE_DIR/checkpoint.txt"
SYNC_STATUS_FILE="$BASE_DIR/status.txt"
# 其他
CSV_COLUMNS="id,factory_code,factory_name,......" #csv列字段,按mysql中查出的列字段顺序排列
# -------------------------------
# 2. 工具函数
# -------------------------------
log() {
echo "[$(date '+%Y-%m-%d %H:%M:%S')] $*" | tee -a "$LOG_FILE" >&2
}
error_exit() {
log "错误: $*"
echo "FAILED: $(date) - $*" > "$SYNC_STATUS_FILE"
exit 1
}
# 创建临时目录
setup_temp_dir() {
mkdir -p "$TEMP_DIR" || error_exit "无法创建临时目录: $TEMP_DIR"
}
# 获取上次同步的最后值
get_last_value() {
if [ -f "$CHECKPOINT_FILE" ]; then
cat "$CHECKPOINT_FILE"
else
case "$INCREMENTAL_TYPE" in
"int"|"bigint") echo "0" ;;
"datetime"|"timestamp") echo "1970-01-01 00:00:00" ;;
*) error_exit "不支持的 INCREMENTAL_TYPE: $INCREMENTAL_TYPE" ;;
esac
fi
}
# 更新最后值
update_last_value() {
echo "$1" > "$CHECKPOINT_FILE"
}
# -------------------------------
# 3. 导出 MySQL 增量数据 (带重试)
# -------------------------------
export_mysql_batch_with_retry() {
local last_value="$1"
local batch_file="$2"
local offset="$3"
local limit="$4"
local max_retries=$MAX_RETRY
local attempt=0
local delay=1
while [ $attempt -lt $max_retries ]; do
attempt=$((attempt + 1))
log "开始导出批次 (尝试 $attempt/$max_retries): OFFSET=$offset, LIMIT=$limit"
# 执行导出
if export_mysql_batch_once "$last_value" "$batch_file" "$offset" "$limit"; then
if [ -s "$batch_file" ]; then
log "导出成功: $batch_file, 行数: $(wc -l < "$batch_file")"
return 0
else
log "导出文件为空 (可能无更多数据)"
return 2 # 特殊返回码:无数据
fi
else
local exit_code=$?
log "导出失败 (尝试 $attempt/$max_retries): 退出码 $exit_code"
if [ $attempt -lt $max_retries ]; then
log "等待 $delay 秒后重试..."
sleep $delay
delay=$((delay * 2)) # 指数退避
fi
fi
done
log "经过 $max_retries 次重试,导出最终失败。"
return 1 # 导出失败
}
# 单次导出(不带重试)
export_mysql_batch_once() {
local last_value="$1"
local batch_file="$2"
local offset="$3"
local limit="$4"
# 构建 WHERE 条件
local where_clause=""
case "$INCREMENTAL_TYPE" in
"datetime"|"timestamp")
where_clause="$INCREMENTAL_COLUMN > '$last_value'"
;;
"int"|"bigint"|"smallint"|"mediumint")
where_clause="$INCREMENTAL_COLUMN > $last_value"
;;
*)
log "不支持的 INCREMENTAL_TYPE: $INCREMENTAL_TYPE"
return 1
;;
esac
# 构建 SQL
local sql="SELECT * FROM $MYSQL_TABLE WHERE $where_clause ORDER BY $INCREMENTAL_COLUMN ASC LIMIT $offset, $limit;"
# 执行导出 (使用 mysql 客户端)
# 注意:NULL 转换为 \N
# timeout 300 mysql --ssl-mode=DISABLED ...... 可以增加300秒超时限制
mysql --ssl-mode=DISABLED \
-h"$MYSQL_HOST" -P"$MYSQL_PORT" \
-u"$MYSQL_USER" -p"$MYSQL_PASS" \
"$MYSQL_DB" -N -s -e "$sql" \
2>> "$LOG_FILE" \
> "$batch_file"
# | sed 's/\t/\\N/g' > "$batch_file"
# 检查命令执行结果
if [ $? -ne 0 ]; then
log "mysql 命令执行失败"
return 1
fi
return 0
}
# -------------------------------
# 4. Doris Stream Load
# -------------------------------
stream_load_to_doris() {
local csv_file="$1"
local doris_label="sync_${MYSQL_TABLE}_$(date +%s)_$$"
# 构建 curl 命令
local curl_cmd="curl -s --location-trusted -X PUT \
-H \"label:$doris_label\" \
-H \"max_filter_ratio:0\" \
-H \"column_separator:$DORIS_COLUMNS_SEPARATOR\" \
$( [ -n "$CSV_COLUMNS" ] && echo "-H \"columns: $CSV_COLUMNS\"") \
-H \"format:csv\" \
-H \"timeout:$STREAM_LOAD_TIMEOUT\" \
-H \"Expect: 100-continue\" \
--data-binary @\"$csv_file\" \
\"http://$DORIS_HOST:$DORIS_HTTP_PORT/api/$DORIS_DB/$DORIS_TABLE/_stream_load\" \
-u \"$DORIS_USER:$DORIS_PASS\""
log "执行 Stream Load: $doris_label"
local response
response=$(eval $curl_cmd) || {
log "curl 执行失败"
return 1
}
# 检查 jq 是否安装
if ! command -v jq &> /dev/null; then
log "警告: jq 未安装,使用文本匹配"
echo "$response" | grep -q '"Status"[[:space:]]*:[[:space:]]*"Success"' && return 0
return 1
fi
local status=$(echo "$response" | jq -r '.Status // empty')
if [ -z "$status" ]; then
log "无法解析响应: $response"
return 1
fi
case "$status" in
"Success")
log "导入成功"
return 0
;;
"Label Already Exists")
local existing=$(echo "$response" | jq -r '.ExistingJobStatus // empty')
if [ "$existing" = "FINISHED" ] || [ "$existing" = "VISIBLE" ]; then
log "Label 已存在且成功,跳过"
return 0
else
log "Label 已存在但状态: $existing"
return 1
fi
;;
"Publish Timeout")
log "导入状态: Publish Timeout (通常可视为成功)"
return 0
;;
*)
local msg=$(echo "$response" | jq -r '.Message // "Unknown"')
log "导入失败: Status=$status, Message=$msg"
return 1
;;
esac
}
# -------------------------------
# 5. 主同步逻辑
# -------------------------------
main() {
log "开始同步 MySQL 表 $MYSQL_DB.$MYSQL_TABLE 到 Doris 表 $DORIS_DB.$DORIS_TABLE"
setup_temp_dir
local LAST_VALUE=$(get_last_value)
log "上次同步最后值: $LAST_VALUE"
echo "START: $(date)" > "$SYNC_STATUS_FILE"
local total_rows=0
local batch_count=0
local current_offset=0
local new_last_value="$LAST_VALUE"
local has_more_data=true
while $has_more_data; do
((batch_count++))
local batch_file="${CSV_FILE_PREFIX}_${batch_count}.csv"
local retry_count=0
local import_success=false
# --- 1. 导出批次 (带重试) ---
local export_result
if export_mysql_batch_with_retry "$LAST_VALUE" "$batch_file" "$current_offset" "$BATCH_SIZE"; then
export_result=0
else
export_result=$?
fi
case $export_result in
0)
# 导出成功,有数据
local row_count=$(wc -l < "$batch_file")
total_rows=$((total_rows + row_count))
# --- 2. 导入 Doris (带重试) ---
while [ $retry_count -lt $MAX_RETRY ] && [ "$import_success" = false ]; do
if stream_load_to_doris "$batch_file"; then
import_success=true
log "批次 $batch_count 导入成功"
else
((retry_count++))
log "批次 $batch_count 导入失败,第 $retry_count 次重试..."
sleep 5
fi
done
if [ "$import_success" = true ]; then
# 更新最大值 (使用 MAX 查询)
local max_sql="SELECT MAX($INCREMENTAL_COLUMN) FROM $MYSQL_TABLE WHERE $INCREMENTAL_COLUMN > '$LAST_VALUE' LIMIT $BATCH_SIZE OFFSET $current_offset;"
#local temp_max=$(timeout 30 mysql --ssl-mode=DISABLED -h"$MYSQL_HOST" -P"$MYSQL_PORT" -u"$MYSQL_USER" -p"$MYSQL_PASS" "$MYSQL_DB" -N -s -e "$max_sql" 2>/dev/null) 有30秒超时限制的语法
local temp_max=$(mysql --ssl-mode=DISABLED -h"$MYSQL_HOST" -P"$MYSQL_PORT" -u"$MYSQL_USER" -p"$MYSQL_PASS" "$MYSQL_DB" -N -s -e "$max_sql" 2>/dev/null)
if [ -n "$temp_max" ] && [ "$temp_max" != "NULL" ]; then
new_last_value="$temp_max"
fi
current_offset=$((current_offset + BATCH_SIZE))
rm -f "$batch_file"
sleep $SLEEP_BETWEEN_BATCHES
else
error_exit "批次 $batch_count 导入失败,重试 $MAX_RETRY 次仍失败"
fi
;;
2)
# 导出成功,但无数据 -> 正常结束
log "无更多数据,同步完成。"
has_more_data=false
rm -f "$batch_file"
;;
*)
# 导出失败 -> 严重错误
error_exit "批次 $batch_count 导出失败,已重试 $MAX_RETRY 次,任务终止"
;;
esac
done
# --- 6. 更新 checkpoint ---
if [ "$new_last_value" != "$LAST_VALUE" ]; then
update_last_value "$new_last_value"
log "同步完成,共处理 $total_rows 行,最后值更新为: $new_last_value"
else
log "同步完成,无新数据"
fi
echo "SUCCESS: $(date) - 处理 $total_rows 行" >> "$SYNC_STATUS_FILE"
log "同步结束"
}
# -------------------------------
# 6. 启动
# -------------------------------
main "$@"使用步骤
- 安装依赖:
- 确保服务器安装了 mysql 客户端和 curl。
- sudo apt-get install mysql-client curl (Ubuntu/Debian) 或 sudo yum install mysql curl (CentOS/RHEL)。
- 修改脚本配置:
- 打开 sync_mysql_to_doris.sh,仔细修改 配置区域 的所有参数,特别是数据库连接信息、表名、增量列、Doris 地址等。
- 确认 DORIS_COLUMNS_SEPARATOR 与 Doris 表定义的分隔符一致。
- 根据实际网络和数据量调整 BATCH_SIZE 和 STREAM_LOAD_TIMEOUT。
- 设置权限:
chmod +x sync_mysql_to_doris.sh
- 创建日志目录
(如果需要):
sudo mkdir -p /var/log sudo touch /data/sync/sync.log
sudo chown your_user:your_group /data/sync/sync.log # 替换为运行脚本的用户
- 测试脚本:
- 先手动运行一次脚本,观察日志输出 (/data/sync/sync.log),检查是否有连接错误、SQL 错误、导入错误等。
- 确保 Doris 的 Stream Load API 可以被访问,且用户有导入权限。
- 确保 MySQL 用户有读取源表和可能的 FILE 权限(如果使用 INTO OUTFILE)。
- 配置定时任务 (crontab):
- 使用 crontab -e 编辑定时任务。
- 例如,每 5 分钟同步一次:
*/5 * * * * /path/to/sync_mysql_to_doris.sh >> /var/log/mysql_doris_sync_cron.log 2>&1
- 或者每小时同步一次:
0 * * * * /path/to/sync_mysql_to_doris.sh >> /var/log/mysql_doris_sync_cron.log 2>&1
- 重要:确保 crontab 环境变量(如 PATH)包含 mysql 和 curl 的路径,或者在脚本中使用绝对路径。
注意事项与优化
- 增量列选择:update_time 是常见选择,但需确保其准确性。id 更简单,但要求是单调递增的。
- 数据一致性:此脚本基于“上次值”进行同步,如果 MySQL 中有数据被修改(非追加),可能会遗漏。确保业务逻辑支持这种增量模式。
- NULL 值处理:可以使用 sed 's/\t/\\N/g' 将 NULL 转换为 \N。确保 Doris 表的 null_value 属性设置为 \N。更复杂的数据(如包含分隔符的字符串)需要更精细的处理(如使用 mysqldump --tab 或 SELECT ... INTO OUTFILE 并正确转义)。
- 错误处理:脚本包含基本错误处理和重试。可根据需要增强,例如发送邮件/短信告警。
- 性能:
- 在 MySQL 的 INCREMENTAL_COLUMN 上建立索引至关重要。
- 调整 BATCH_SIZE 以平衡内存使用和导入效率。
- 考虑 Doris 的导入性能瓶颈。
- 首次全量同步:如果是首次同步,LAST_VALUE_FILE 不存在,脚本会从初始值开始。如果表非常大,首次全量同步可能耗时很长,建议在业务低峰期进行,或先手动做一次全量导入。
- 并发与锁:频繁的 SELECT 可能对 MySQL 造成压力,考虑使用从库进行读取。
- Doris Label:Stream Load 的 label 必须唯一,脚本使用时间戳和进程ID保证。如果导入中断重试,相同的 label 会导致 Label Already Exists 错误,脚本的重试机制可以处理。
- 监控:定期检查日志文件和 Doris 的导入任务状态。
注意:根据你的具体环境(网络、数据量、表结构、Doris 配置),可能需要进行微调。务必先在测试环境充分验证!
最后修改时间:2025-08-25 10:06:41
「喜欢这篇文章,您的关注和赞赏是给作者最好的鼓励」
关注作者
【版权声明】本文为墨天轮用户原创内容,转载时必须标注文章的来源(墨天轮),文章链接,文章作者等基本信息,否则作者和墨天轮有权追究责任。如果您发现墨天轮中有涉嫌抄袭或者侵权的内容,欢迎发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。




