暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

数仓实战|Python轻松搞定Doris 例行导入(Routine Load)任务监控

数据中台研习社 2021-09-27
7285

     前面分享了一篇文章,《数仓实战|Routine Load实时同步Kafka数据到Doris》,介绍了我们项目上加载kafka数据到Doris的方案。随着项目开发的推进,有了越来越多的Routine Load任务,这时候,运行瓶颈就出现了。不管是源系统数据质量问题,还是网络波动或者kafka内存异常的,都可能导致Routine Load任务执行失败,流数据加载中断。

      如前文所述,短期内数据加载中断,我们可以通过下面的重启命令恢复数据同步功能。

    resume routine load for xxx;  --重启xxx导入任务

          或者在问题处理以后,将Routine Load任务状态重置,从最早的offset开始读取数据。

      ALTER ROUTINE LOAD FOR XX
      PROPERTIES
      (
      "desired_concurrent_number" = "1"
      )
      FROM kafka
      (
      "kafka_partitions" = "0",
      "kafka_offsets" = "OFFSET_BEGINNING",
      "property.group.id" = "xxx_topic",
      "property.client.id" = "doris"
      ); --修改任务参数,从kafka队列最开始重新读取


             但是,如果不能及时发现Routine Load异常,数据中断时长超过kafka缓存数据有效期,那就麻烦了,需要重新通过DataX补抽数据。

            于是,我就想到了用Python脚本来监控Doirs的例行导入任务。前文分享过一篇文章,《新知识|python连接mysql,使用mysqldb和mysqlclient、pymysql三者的异同》,经过对比,最终决定还是用pymysql插件,比较这个安装最简单了。

             监控Routine Load状态,最重要的就是要连上不同的database,执行show routine load 命令,并获取结果进行判断和预警。至于预警,很自然的我就想到了前几天学到的 钉钉机器人。后面发现,企业微信也有同款机器人,操作非常简单,这里也截图说明一下。

              首先,在企业微信中发起一个群聊。


             然后,在群设置中添加机器人。添加“添加机器人”,

            

             填写机器人信息:

             然后点击复制地址,即可完成。


           完成了前置准备工作,接下来直接上代码了。其实也很简单,就几十行Python。

        #!/usr/bin/python
        # -*- coding: UTF-8 -*-


        import pymysql
        import requests
        import sys
        if sys.getdefaultencoding() != 'utf-8':
        reload(sys)
        sys.setdefaultencoding('utf-8')


        SEND_URL='https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key=${token}'


        def send_message(mess):
        print("收到的消息内如如下:%s"%mess)
        url = SEND_URL
        pagrem = {
        "msgtype": "text",
        "text": {
        "content": mess,
        "mentioned_list":["@all"],
        "mentioned_mobile_list":["@all"]
        },
        }
        headers = {
        'Content-Type': 'application/json'
        }
        print(json.dumps(pagrem))
        resp = requests.post(url, data=json.dumps(pagrem), headers=headers)
        print(resp.status_code)
        print(resp.text)




        def check_schema_load(schema):
        # 连接database
        conn = pymysql.connect(
        host='192.168.x.x',
        port=9030,
        user='root',
        password='xxxx',
        database=schema,
        charset='utf8')


        # 得到一个可以执行SQL语句的光标对象
        cursor = conn.cursor() # 执行完毕返回的结果集默认以元组显示
        # 得到一个可以执行SQL语句并且将结果作为字典返回的游标
        cursor = conn.cursor(cursor=pymysql.cursors.DictCursor)

        # 定义要执行的SQL语句
        sql = """
        show routine load;
        """

        # 执行SQL语句
        cursor.execute(sql)
        res = cursor.fetchall()
        err_list=[]
        print(u"查询到DRP的routine load任务数为:%d"%len(res))
        for row in res:
        print(u"任务ID:%s 任务名:%s 目标数据库:%s 目标表名:%s 任务状态:%s"%(row['Id'],row['Name'],row['DbName'],row['TableName'],row['State']))
        if (row['State'] == "RUNNING"):
        continue
        else:
        row_mess = u"任务名称:%s 目标表名:%s 状态变化原因:%s 错误日志URL:%s 补充信息:%s "%(row['Name'],row['TableName'],row['ReasonOfStateChanged'],row['ErrorLogUrls'],row['OtherMsg'])
        err_list.append(row_mess)

        err_mess =u'检查到%s模式下的异常routine load有:\n'%schema
        if len(err_list) >0 :
        for index,mess in enumerate(err_list):
        print(mess)
        err_mess = err_mess + str(index+1) + '. ' + mess + '\n'
        send_message(err_mess)

        # 关闭光标对象
        cursor.close()
        # 关闭数据库连接
        conn.close()


        if __name__ == "__main__":
        schema_list = ['ods_drp','ods_hana','ods_e3_fx','ods_e3_jv','ods_e3_zy','ods_e3_pld','ods_rpa','ods_xgs','ods_xt1']
        for schema in schema_list:
        check_schema_load(schema)



              完成代码调试以后,企业微信群收到的消息截图如下:

            

               有了脚本以后,到服务器上安装pymysql  requests 两个包,进一步进行调试。

          pip install  pymysql  requests

                然后上传脚本,进行调试。

            python monitor_doris_routine_load_dd.py


            执行crontab 配置定时任务如下:

              0 7-23 * * * python data/cron_shell/monitor_doris_routine_load_dd.py >>/data/cron_shell/log/monitor_doris_routine_load_dd.log




              数据中台研习社》微信群,请添:laowang5244,备注【进群】


                                                                       🧐分享、点赞、在看,给个三连击呗!👇


              文章转载自数据中台研习社,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

              评论