前面分享了一篇文章,《数仓实战|Routine Load实时同步Kafka数据到Doris》,介绍了我们项目上加载kafka数据到Doris的方案。随着项目开发的推进,有了越来越多的Routine Load任务,这时候,运行瓶颈就出现了。不管是源系统数据质量问题,还是网络波动或者kafka内存异常的,都可能导致Routine Load任务执行失败,流数据加载中断。

如前文所述,短期内数据加载中断,我们可以通过下面的重启命令恢复数据同步功能。
resume routine load for xxx; --重启xxx导入任务
或者在问题处理以后,将Routine Load任务状态重置,从最早的offset开始读取数据。
ALTER ROUTINE LOAD FOR XXPROPERTIES("desired_concurrent_number" = "1")FROM kafka("kafka_partitions" = "0","kafka_offsets" = "OFFSET_BEGINNING","property.group.id" = "xxx_topic","property.client.id" = "doris"); --修改任务参数,从kafka队列最开始重新读取
但是,如果不能及时发现Routine Load异常,数据中断时长超过kafka缓存数据有效期,那就麻烦了,需要重新通过DataX补抽数据。
于是,我就想到了用Python脚本来监控Doirs的例行导入任务。前文分享过一篇文章,《新知识|python连接mysql,使用mysqldb和mysqlclient、pymysql三者的异同》,经过对比,最终决定还是用pymysql插件,比较这个安装最简单了。
监控Routine Load状态,最重要的就是要连上不同的database,执行show routine load 命令,并获取结果进行判断和预警。至于预警,很自然的我就想到了前几天学到的 钉钉机器人。后面发现,企业微信也有同款机器人,操作非常简单,这里也截图说明一下。
首先,在企业微信中发起一个群聊。

然后,在群设置中添加机器人。添加“添加机器人”,

填写机器人信息:

然后点击复制地址,即可完成。

完成了前置准备工作,接下来直接上代码了。其实也很简单,就几十行Python。
#!/usr/bin/python# -*- coding: UTF-8 -*-import pymysqlimport requestsimport sysif sys.getdefaultencoding() != 'utf-8':reload(sys)sys.setdefaultencoding('utf-8')SEND_URL='https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key=${token}'def send_message(mess):print("收到的消息内如如下:%s"%mess)url = SEND_URLpagrem = {"msgtype": "text","text": {"content": mess,"mentioned_list":["@all"],"mentioned_mobile_list":["@all"]},}headers = {'Content-Type': 'application/json'}print(json.dumps(pagrem))resp = requests.post(url, data=json.dumps(pagrem), headers=headers)print(resp.status_code)print(resp.text)def check_schema_load(schema):# 连接databaseconn = pymysql.connect(host='192.168.x.x',port=9030,user='root',password='xxxx',database=schema,charset='utf8')# 得到一个可以执行SQL语句的光标对象cursor = conn.cursor() # 执行完毕返回的结果集默认以元组显示# 得到一个可以执行SQL语句并且将结果作为字典返回的游标cursor = conn.cursor(cursor=pymysql.cursors.DictCursor)# 定义要执行的SQL语句sql = """show routine load;"""# 执行SQL语句cursor.execute(sql)res = cursor.fetchall()err_list=[]print(u"查询到DRP的routine load任务数为:%d"%len(res))for row in res:print(u"任务ID:%s 任务名:%s 目标数据库:%s 目标表名:%s 任务状态:%s"%(row['Id'],row['Name'],row['DbName'],row['TableName'],row['State']))if (row['State'] == "RUNNING"):continueelse:row_mess = u"任务名称:%s 目标表名:%s 状态变化原因:%s 错误日志URL:%s 补充信息:%s "%(row['Name'],row['TableName'],row['ReasonOfStateChanged'],row['ErrorLogUrls'],row['OtherMsg'])err_list.append(row_mess)err_mess =u'检查到%s模式下的异常routine load有:\n'%schemaif len(err_list) >0 :for index,mess in enumerate(err_list):print(mess)err_mess = err_mess + str(index+1) + '. ' + mess + '\n'send_message(err_mess)# 关闭光标对象cursor.close()# 关闭数据库连接conn.close()if __name__ == "__main__":schema_list = ['ods_drp','ods_hana','ods_e3_fx','ods_e3_jv','ods_e3_zy','ods_e3_pld','ods_rpa','ods_xgs','ods_xt1']for schema in schema_list:check_schema_load(schema)
完成代码调试以后,企业微信群收到的消息截图如下:

有了脚本以后,到服务器上安装pymysql requests 两个包,进一步进行调试。
pip install pymysql requests
然后上传脚本,进行调试。
python monitor_doris_routine_load_dd.py
执行crontab 配置定时任务如下:
0 7-23 * * * python data/cron_shell/monitor_doris_routine_load_dd.py >>/data/cron_shell/log/monitor_doris_routine_load_dd.log

《数据中台研习社》微信群,请添:laowang5244,备注【进群】
🧐分享、点赞、在看,给个三连击呗!👇




