暂无图片
暂无图片
1
暂无图片
暂无图片
暂无图片

GoldenDb数据迁移梧桐云原生分析型数据库技术解决方案

原创 easilyyy 2024-10-28
117

GoldenDb数据迁移梧桐云原生分析型数据库技术解决方案

1、概述

        随着数据库国产化不断推进,某某公司有部分重要接口数据,源端使用的是GoldenDb数据库。如何快速高效将这部分GoldenDb数据库中的数据加载到梧桐云原生分析型数据库,将成为梧桐云原生分析型数据库承载这部分GoldenDb数据业务的关键因素。

2、数据迁移方案

        GoldenDb数据抽取到梧桐云原生分析型数据库采用MapReduce Hash分布方式,将GoldenDb数据库中的表数据抽取并加载到梧桐云原生分析型数据库。具体功能架构如下图所示:

GoldenDb迁移梧桐数据库_HASH数据处理流程.png

       2.1 数据HASH

计算GoldenDb ROWID散列值,GoldenDb数据库兼容了Oracle数据库特性,支持ROWID,采用HASH函数计算ROWID的散列值,如下图所示:

select * FROM xxxxxx.xx_xx_xx_xxxx;

GoldenDb迁移梧桐数据库_计算rowid散列值.png

       2.2 数据切片

按照进程数目进行数据切片,每一个进程负责抽取相对应的切片数据,多进程抽取,提升数据抽取效率。
代码实现

public static void main (String[] args){

//遍历入参
for (int i=0;i != args.length;i++){
String key = args[i].Substring(0,pos);
String value = new String(args[i].Substring(pos+i).trim());

//取设定的 map(分片) 个数
if(key.equalsIgnoreCase(“map”))maps=Integer.parseInt(value);

}

//生成各个Map任务的SQL语句;
BufferedOutputStream bos = new BufferedOutputStream(fs.create(inputPath));
if(cnt <= 100000 || maps == 1) {
maps = 1;
bos.write(sql_text.getBytes(), 0, sql_text.length());
}
else for(int i = 0; i != maps ; i++) {
if(sql_text != null ){
Matcher m = pattern.matcher(sql_text);
if(m.find())sql = sql_text + " where ";
else sql = sql_text + " and ";
}
else {
sql = String.format("select /*+ parallel(6) */ %s from %s.%s where ", columns, unitConf.tableUser, unitConf.tableName);
if(whereString != null && whereString.length() > 0)
sql = sql + whereString + " and ";
}
sql = sql + “nvl(mod(ora_hash(” + keyColumn + "), " + maps + "), 0) = " + i + “\n”;
bos.write(sql.getBytes(), 0, sql.length());
}
bos.close();
}

日志显示

GoldenDb迁移梧桐数据库_数据切片.png

       2.3 数据写入HDFS

1、首先连接到 GoldenDb 数据库
代码实现

logger.info(“连接到Goldendb数据库…”);
Class.forName(“com.goldendb.jdbc.Driver”).newInstance();
connection = DriverManager.getConnection(unitConf.jdbcUrl, unitConf.dbUser, unitConf.dbPassword);
connection.setReadOnly(true);

日志显示

GoldenDb迁移梧桐数据库_连接数据库.png

2、执行SQL
代码实现

logger.info(String.format(“执行SQL[%s]…”, sql));
statement = connection.prepareStatement(sql);
statement.setFetchSize(unitConf.batchRows<<4);
result = statement.executeQuery();
ResultSetMetaData meta = result.getMetaData();
int fieldCount = meta.getColumnCount();

日志显示

GoldenDb迁移梧桐数据库_执行SQL.png

3、数据写入HDFS
代码实现

logger.info(“发送数据到HDFS…”);
int rowCount = 0;
StringBuilder sb = new StringBuilder(1048576);
String colValue = null;
while (result.next()) {
// 构造消息记录。
for(int i = 1; i <= fieldCount; i++){
if(i > 1)sb.append(delimiter);
colValue = result.getString(i);
if(colValue != null) {
if(meta.getColumnType(i) == java.sql.Types.TIMESTAMP)
colValue = colValue.substring(0,19);
else
colValue = colValue.replaceAll("(\n|\r|\s)", “”);
sb.append(colValue);
}
}
sb.append(’\n’);
rowCount++;
if((rowCount % unitConf.batchRows) == 0){
String messageStr = sb.substring(0, sb.length()-1);
multipleOutputs.write(nada, new Text(messageStr), fileName);
sb = new StringBuilder(1048576);
logger.debug(String.format(“已经发送【%d】条数据到HDFS…”, rowCount));
}
}
if((rowCount % unitConf.batchRows) != 0){
String messageStr = sb.substring(0, sb.length()-1);
multipleOutputs.write(nada, new Text(messageStr), fileName);
logger.debug(String.format(“Message:%s\n”, messageStr));
}
logger.info(String.format(“发送数据到Hdfs成功,已经发送记录数【%d】”, rowCount));
logger.info(“移动文件到结果目录…”);

日志显示

GoldenDB迁移梧桐数据库_数据写入HDFS.png

       2.4 数据装载梧桐云原生分析型数据库

1、登录梧桐云原生分析型数据库
psql -h -p -U <用户名称> -d
2、创建外部表
psql> create readable external table ext_tb_???
( ??? date,

??? text
)
LOCATION ( 'hdfs:///xxxx:???/???/???/<tb_name>*.dat’)
FORMAT ‘TEXT’ ( DELIMITER );
3 外部表加载到内部表
insert into <模式名>.<表名> select * from ext_tb
???;

最后修改时间:2024-10-28 23:36:43
「喜欢这篇文章,您的关注和赞赏是给作者最好的鼓励」
关注作者
【版权声明】本文为墨天轮用户原创内容,转载时必须标注文章的来源(墨天轮),文章链接,文章作者等基本信息,否则作者和墨天轮有权追究责任。如果您发现墨天轮中有涉嫌抄袭或者侵权的内容,欢迎发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论