暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

将 DataX 执行结果通过钉钉上报

白粥笔记 2021-06-04
1198

在上文《使用 Docker 运行 DataX 定时全量备份关键数据表》中我们知道了如何运行 DataX 备份程序,接下来我们实现将任务执行结果通过钉钉自定义机器人通知。


DataX 的 Hook 机制

DataX 的代码库,在“common/src/main/java/com/alibaba/datax/common/spi/Hook.java”类中定义了开放的接口:

public interface Hook {


/**
* 返回名字
*
* @return
*/
public String getName();


/**
* TODO 文档
*
* @param jobConf
* @param msg
*/
public void invoke(Configuration jobConf, Map<String, Number> msg);


}


在“core/src/main/java/com/alibaba/datax/core/container/util/HookInvoker.java”类中使用 ServiceLoader 机制加载并 Hook 的实现类。


在“core/src/main/java/com/alibaba/datax/core/job/JobContainer.java”类中,当任务结束后调用“HookInvoker”执行了所有的 Hook:

/**
* Created by jingxing on 14-8-24.
* <p/>
* job实例运行在jobContainer容器中,它是所有任务的master,负责初始化、拆分、调度、运行、回收、监控和汇报
* 但它并不做实际的数据同步操作
*/
public class JobContainer extends AbstractContainer {

/**
* jobContainer主要负责的工作全部在start()里面,包括init、prepare、split、scheduler、
* post以及destroy和statistics
*/
@Override
public void start() {
LOG.info("DataX jobContainer starts job.");


boolean hasException = false;
boolean isDryRun = false;
try {
this.startTimeStamp = System.currentTimeMillis();
isDryRun = configuration.getBool(CoreConstant.DATAX_JOB_SETTING_DRYRUN, false);
if(isDryRun) {
LOG.info("jobContainer starts to do preCheck ...");
this.preCheck();
} else {
userConf = configuration.clone();
LOG.debug("jobContainer starts to do preHandle ...");
this.preHandle();


LOG.debug("jobContainer starts to do init ...");
this.init();
LOG.info("jobContainer starts to do prepare ...");
this.prepare();
LOG.info("jobContainer starts to do split ...");
this.totalStage = this.split();
LOG.info("jobContainer starts to do schedule ...");
this.schedule();
LOG.debug("jobContainer starts to do post ...");
this.post();


LOG.debug("jobContainer starts to do postHandle ...");
this.postHandle();
LOG.info("DataX jobId [{}] completed successfully.", this.jobId);
// ⚠️ 执行hook ⚠️
this.invokeHooks();
}
}
// 省略
}

/**
* 调用外部hook
*/
private void invokeHooks() {
Communication comm = super.getContainerCommunicator().collect();
HookInvoker invoker = new HookInvoker(CoreConstant.DATAX_HOME + "/hook", configuration, comm.getCounter());
invoker.invokeAll();
}

}


其中,“configuration”为对应 job 的 json 配置信息,“comm.getCounter()”存放相关的执行统计。我们通过实现 Hook 接口,就可以拿到相应的信息。


实现 Hook

新建一个项目,将 DataX 的“datax-common-0.0.1-SNAPSHOT.jar”和钉钉的“taobao-sdk-java-auto_1479188381469-20210528.jar”加入到 classpath 中。


然后编写继承 Hook 接口的实现类:DingTalkReport。

public class DingTalkReport implements Hook {
private static final Logger LOG = LoggerFactory.getLogger(DingTalkReport.class);
private static final SimpleDateFormat dateFormat = new SimpleDateFormat(
"yyyy-MM-dd HH:mm:ss");


@Override
public String getName() {
return "DingTalkReportHook";
}


@Override
public void invoke(Configuration configuration, Map<String, Number> map) {
LOG.debug(configuration.beautify());
//{writeSucceedRecords=1248,
// readSucceedRecords=1247,
// totalErrorBytes=0,
// writeSucceedBytes=81477,
// byteSpeed=0,
// totalErrorRecords=0,
// recordSpeed=0,
// waitReaderTime=308600221,
// writeReceivedBytes=81477,
// stage=1,
// waitWriterTime=6348796,
// percentage=1.0,
// totalReadRecords=1247,
// writeReceivedRecords=1248,
// readSucceedBytes=81477,
// totalReadBytes=81477}
LOG.debug(map.toString());


// 从job的json配置中读取自定义的参数
String accessToken = getString(configuration.get("job.dingTalkReporter.accessToken"));
String title = getString(configuration.get("job.dingTalkReporter.title"));
String secret = getString(configuration.get("job.dingTalkReporter.secret"));
String defaultTemplate = "# %s \n > %s \n - 成功读:%d \n - 成功写:%d \n - 等待%d秒";
String time = dateFormat.format(System.currentTimeMillis());
// 好像没办法拿到执行时间,如果需要更多map中没有的信息,可以改写Hook接口
long totalWaitTime = TimeUnit.SECONDS.convert(map.get("waitReaderTime").longValue() + map.get("waitWriterTime").longValue(), TimeUnit.NANOSECONDS);


String content = String.format(defaultTemplate, title, time, map.get("readSucceedRecords").longValue(), map.get("writeSucceedRecords").longValue(), totalWaitTime);


DingTalkUtil.send(title, content, accessToken, secret);
}


String getString(Object obj){
return null == obj ? "" : String.valueOf(obj);
}


}


打包

假定包名为:com.bz.datax.hook,在 resources 下创建目录“META_INF/services”,在 services 文件夹下,新建文件“com.alibaba.datax.common.spi.Hook”,文件内容为 Hook 接口实现类的路径。


com.bz.datax.hook.DingTalkReport

然后通过 idea 打包即可。


集成 Hook

引入 Hook 实现包


在 DataX 根目录下,新建“hook”文件夹,然后在其下创建“dingtalk”文件夹(也可自定义其他名字),将我们打包好的 jar 包放到“dingtalk”文件夹下。


新增 job 配置

接入钉钉自定义机器人,需要引入额外的配置信息,我们可以在 job 任务描述文件新增钉钉的配置参数“dingTalkReporter”,利用 configuration 类提取即可。

{
"job": {
"dingTalkReporter": {
"accessToken": "d9e5b4f89xxxxxxxxxxx",
"secret": "SECc008578xxxxxxx",
"title": "Test表同步"
},
"setting": {
"speed": {
"byte":10485760
},
"errorLimit": {
"record": 0,
"percentage": 0.02
}
},
……
}


其中 accessToken、title 为必填,secret 为选填。


启动效果:

2021-06-03 10:39:08.673 [job-0] INFO  HookInvoker - Invoke hook [DingTalkReportHook], path: /datax/hook/dingtalk
2021-06-03 10:39:09.415 [job-0] INFO DingTalkUtil - Send DingTalk Message Result:ok-0


通知效果:



参考

  1. 完整项目代码参加 Github:https://github.com/mchange/datax-dingtalk-report

  2. 钉钉自定义机器人接入文档:https://developers.dingtalk.com/document/app/custom-robot-access


文章转载自白粥笔记,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论