暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

如何用触发器实现企业微信自动告警

海豚调度 2023-10-10
456

原理

触发器监控工作流实例表,当工作流实例表中的状态更新后,针对状态为失败的任务进行企业微信告警。

发送企业微信消息函数

    su - postgres
    # 必须在pg的主机上线安装requests模块
    pip install requests
    # 以postgres用户登陆psql客户端到etl数据库
    psql etl -U postgres
    # 创建插件plpython3u
    create extension plpython3u;
    # plpython3u为不受信语言,所以只能被超级用户使用
    # 在tool模式下建立发送企业微信消息函数tool.sp_send_wechat
    CREATE OR REPLACE FUNCTION tool.sp_send_wechat(message json, webhook character varying DEFAULT 'https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key=你自己的key'::character varying)
    RETURNS text
    LANGUAGE plpython3u
    SECURITY DEFINER
    AS $function$
    import requests
    import json
    """
    /*
    * 作者 : v-yuzhenc
    * 功能 : 给企业微信发送一条消息
    * message : 需要发送的消息,json格式
    * webhook : 企业微信机器人的webhook
    * */
    """
    import requests
    import json


    # 企业微信自定义机器人的webhook地址
    p_webhook = webhook
    # 要发送的消息内容
    p_message = json.loads(message)
    # 发送POST请求
    response = requests.post(p_webhook, data=json.dumps(p_message), headers={"Content-Type": "application/json"})


    # 打印响应结果
    return response.text
    $function$
    ;
    --将函数直接转给tool
    ALTER FUNCTION tool.sp_send_wechat(json, varchar) OWNER TO tool;
    --公开函数的执行权限
    GRANT ALL ON FUNCTION tool.sp_send_wechat(json, varchar) TO public;
    --将函数的执行权限授权给tool用户
    GRANT ALL ON FUNCTION tool.sp_send_wechat(json, varchar) TO tool;
    \q

    远程执行命令函数

    • • 由于海豚调度的任务日志是以文件的形式存储在操作系统中,所以,必须在数据库中实现这样一个函数,能够读取海豚服务器的日志文件。

      • su - postgres
        # 必须在pg的主机上安装paramiko模块
        pip install paramiko
        # 以postgres用户登陆psql客户端到etl数据库
        psql etl -U postgres
        # 上面已经创建了plpython3u插件,这里不需要再次建立了
        # 创建远程执行命令函数tool.sp_remote_exec_command_nopass
        CREATE OR REPLACE FUNCTION tool.sp_remote_exec_command_nopass(remote_command text, remote_host character varying DEFAULT 'dpmaster'::character varying, remote_port integer DEFAULT 22222, remote_username character varying DEFAULT 'dp'::character varying, remote_return_mode character varying DEFAULT 'stdout'::character varying)
        RETURNS text
        LANGUAGE plpython3u
        SECURITY DEFINER
        AS $function$
        import paramiko
        """
        /*
        * 作者 : v-yuzhenc
        * 功能 : 免密(需要配置ssh免密)在远程服务器执行一条命令
        * remote_command : 需要执行的命令
        * remote_host : 远程主机名或ip
        * remote_port : ssh端口
        * remote_username : 免密登陆的用户
        * remote_return_mode : 返回信息的模式,stderr返回标准错误信息,否则返回标准输出
        * */
        """
        # SSH连接信息
        host = remote_host
        port = remote_port
        username = remote_username
        private_key_path = '/home/postgres/.ssh/id_rsa'
        ssh_command = remote_command


        # 连接SSH服务器
        ssh = paramiko.SSHClient()
        ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
        private_key = paramiko.RSAKey.from_private_key_file(private_key_path)
        ssh.connect(host, port, username, pkey=private_key)


        # 通过SSH执行命令
        stdin, stdout, stderr = ssh.exec_command(ssh_command)
        p_stdout = stdout.read().decode().strip()
        p_stderr = stderr.read().decode().strip()
        # 关闭SSH连接
        ssh.close()
        # 打印响应结果
        if remote_return_mode == 'stderr':
        return None if p_stderr == '' else p_stderr
        else:
        return None if p_stdout == '' else p_stdout
        $function$
        ;
        -- Permissions
        ALTER FUNCTION tool.sp_remote_exec_command_nopass(text, varchar, int4, varchar, varchar) OWNER TO tool;
        GRANT ALL ON FUNCTION tool.sp_remote_exec_command_nopass(text, varchar, int4, varchar, varchar) TO tool;
        GRANT ALL ON FUNCTION tool.sp_remote_exec_command_nopass(text, varchar, int4, varchar, varchar) TO dp;
        \q

      获取子工作流实例下所有子任务实例

        su - postgres
        psql etl -U dp
        CREATE OR REPLACE FUNCTION dp.get_subtaskins_all(dp_p_ins_id integer)
        RETURNS integer[]
        LANGUAGE plpgsql
        SECURITY DEFINER
        AS $function$
        /*
        * 作者 : v-yuzhenc
        * 功能 : 根据工作流实例id获取子工作流实例下所有子任务实例,返回int类型的数组
        * dp_p_ins_id : 工作流实例id
        * */
        declare
        p_process_instance_id int := dp_p_ins_id;
        v_result int[];
        begin
        with recursive recursive_query as (
        -- 初始查询
        select
        parent_process_instance_id
        ,process_instance_id
        ,parent_task_instance_id
        from t_ds_relation_process_instance
        where parent_process_instance_id = p_process_instance_id
        union all
        -- 递归查询
        select
        t.parent_process_instance_id
        ,t.process_instance_id
        ,t.parent_task_instance_id
        from t_ds_relation_process_instance t
        inner join recursive_query r
        on (t.parent_process_instance_id = r.process_instance_id)
        ), tmp_b as (
        select parent_task_instance_id task_instance_id from recursive_query a
        union
        select b.id task_instance_id
        from recursive_query a, t_ds_task_instance b
        where a.process_instance_id = b.process_instance_id
        )
        select (string_to_array(string_agg(task_instance_id::text,','),','))::int[]
        into v_result
        from tmp_b
        ;
        return v_result;
        end;
        $function$
        ;


        -- Permissions


        ALTER FUNCTION dp.get_subtaskins_all(int4) OWNER TO dp;
        GRANT ALL ON FUNCTION dp.get_subtaskins_all(int4) TO dp;
        \q

        企业微信告警触发器

        • • 由于企业微信markdown格式的消息艾特指定的人只能通过企业微信中的userid(即用户在企业微信中的账号)调用,所以,我们在海豚调度的元数据表t_ds_user中增加wechat_userid字段,人工将海豚的用户对应的企业微信的userid维护上去

          • # 以dp用户登录etl数据库
            psql etl -U dp
            # 增加字段
            alter table t_ds_user add wechat_userid varchar(100);
            comment on column t_ds_user.wechat_userid is '对应的企业微信的userid';
            # 维护wechat_userid中的数据
            # 这里根据自己的企业实际情况做
            update t_ds_user
            set wechat_userid = 'YuZhenChao'
            where user_name = 'yuzhenchao'
            ;
            CREATE OR REPLACE FUNCTION dp.tg_ds_udef_alert_wechat()
            RETURNS trigger
            LANGUAGE plpgsql
            AS $function$
            /*
            * 作者:v-yuzhenc
            * 功能:海豚调度工作流失败自动告警
            * */
            declare
            i record;
            v_content text;
            v_message varchar;
            begin
            case
            when new.state = 6 then
            begin
            for i in (
            select
            '<@'||coalesce(d.wechat_userid,'')||'>\r\n# [DolphinScheduler Job ]\r\n> 实例 id : ['||coalesce(a.id::varchar,'')||'/'||coalesce(b.id::varchar,'')||'](https://dolphin.tclpv.com/dolphinscheduler/ui/projects/'||coalesce(g.code::varchar,'')||'/workflow/instances/'||coalesce(a.id::varchar,'')||'?code='||coalesce(a.process_definition_code::varchar,'')||')\r\n> 项目名称 : <font color=\"comment\">'||coalesce(g.name,'')||'('||coalesce(g.code::varchar,'')||')</font>'||'\r\n> 工作流名 : <font color=\"comment\">'||coalesce(e.name,'')||'('||coalesce(a.process_definition_code::varchar,'')||')</font>'||'\r\n> 任务名称 : <font color=\"comment\">'||coalesce(b.name,'')||'('||coalesce(b.task_code::varchar,'')||')</font>'||'\r\n> 任务类型 : <font color=\"comment\">'||coalesce(b.task_type,'')||'</font>\r\n> 开始时间 : <font color=\"comment\">'||to_char(b.start_time,'yyyy-mm-dd hh24:mi:ss')||'</font>\r\n> 结束时间 : <font color=\"comment\">'||to_char(b.end_time,'yyyy-mm-dd hh24:mi:ss')||'</font>\r\n> 所属用户 : <font color=\"comment\">'||coalesce(d.user_name,'')||'('||coalesce(c.user_id::varchar,'')||')</font>\r\n> 任务状态 : <font color=\"warning\">Failed</font>'||'\r\n> 报错信息 : <font color=\"warning\">'||coalesce(tool.sp_remote_exec_command_nopass($remote_command$cat $remote_command$||b.log_path||$remote_command$ | grep "\[ERROR\]\|等表超时" | awk -F' - ' '{print $2!=null ? $2 : $1}' | head -1 | sed 's/\"/\\\"/g'$remote_command$,split_part(b.host,':',1)),'')||'</font>' as wechat_content
            from t_ds_process_instance a
            inner join t_ds_task_instance b
            on (a.id = b.process_instance_id)
            inner join t_ds_task_definition c
            on (b.task_code = c.code and b.task_definition_version = c."version")
            inner join t_ds_user d
            on (c.user_id = d.id)
            inner join t_ds_process_definition e
            on (a.process_definition_code = e.code and a.process_definition_version = e."version")
            inner join t_ds_project g
            on (e.project_code = g.code)
            where c.task_type <> 'SUB_PROCESS'
            and a.state = 6
            and b.state = 6
            and a.id = new.id
            ) loop
            v_content := i.wechat_content;
            v_message := $v_message${
            "msgtype":"markdown",
            "markdown": {
            "content":"$v_message$||v_content||$v_message$"
            }
            }$v_message$;
            if v_message is not null then
            --告警
            perform tool.sp_send_wechat(v_message::json);
            end if;
            end loop;
            end;
            when new.state in (4,5) then
            begin
            --先告警
            select
            '# [DolphinScheduler Job ]\r\n> 实例 id : ['||coalesce(a.id::varchar,'')||'](https://dolphin.tclpv.com/dolphinscheduler/ui/projects/'||coalesce(g.code::varchar,'')||'/workflow/instances/'||coalesce(a.id::varchar,'')||'?code='||coalesce(a.process_definition_code::varchar,'')||')\r\n> 项目名称 : <font color=\"comment\">'||coalesce(g.name,'')||'('||coalesce(g.code::varchar,'')||')</font>'||'\r\n> 工作流名 : <font color=\"comment\">'||coalesce(e.name,'')||'('||coalesce(a.process_definition_code::varchar,'')||')</font>'||'\r\n> 开始时间 : <font color=\"comment\">'||to_char(a.start_time,'yyyy-mm-dd hh24:mi:ss')||'</font>\r\n> 结束时间 : <font color=\"comment\">'||to_char(coalesce(a.end_time,current_timestamp),'yyyy-mm-dd hh24:mi:ss')||'</font>'||'\r\n> 任务状态 : <font color=\"warning\">'||'Killed'||'</font>' as wechat_content
            into v_content
            from t_ds_process_instance a
            inner join t_ds_process_definition e
            on (a.process_definition_code = e.code and a.process_definition_version = e."version")
            inner join t_ds_project g
            on (e.project_code = g.code)
            where a.state in (4,5)
            and a.id = new.id
            ;
            v_message := $v_message${
            "msgtype":"markdown",
            "markdown": {
            "content":"$v_message$||v_content||$v_message$"
            }
            }$v_message$;
            if v_message is not null then
            --告警
            perform tool.sp_send_wechat(v_message::json);
            end if;
            --先把工作流对应的任务实例置为失败
            update t_ds_task_instance
            set state = 6
            where state not in (6,7)
            and id = any(get_subtaskins_all(new.id))
            ;
            --更新工作流状态为失败
            update t_ds_process_instance
            set state = 6
            where id = new.id
            ;
            end;
            else
            null;
            end case;
            return new;
            exception when others then
            insert into t_ds_trigger_log (trigger_name,error_msg) values ('tg_ds_udef_alert_wechat',sqlerrm);
            return new;
            end;
            $function$
            ;


            -- Permissions


            ALTER FUNCTION dp.tg_ds_udef_alert_wechat() OWNER TO dp;
            GRANT ALL ON FUNCTION dp.tg_ds_udef_alert_wechat() TO dp;




            # 创建时候触发器
            create trigger tg_state_ds_process_instance after update on dp.t_ds_process_instance for each row execute function dp.tg_ds_udef_alert_wechat();
            \q

          测试

          • • 新建一个工作流,选择SQL组件

          •  


          • • 保存工作流 



          • • 上线工作流并运行工作流  



          • • 工作流运行失败



          • • 随即企业微信来了消息提醒 



          实践案例

          奇富科技  蜀海供应链 联通数科 拈花云科

          蔚来汽车 长城汽车 集度 长安汽车

          思科网讯 生鲜电商 联通医疗 联想

          新网银行 消费金融  腾讯音乐 自如

          有赞 伊利 当贝大数据

          联想 传智教育 Bigo


          参与Apache DolphinScheduler 社区有非常多的参与贡献的方式,包括:


          贡献第一个PR(文档、代码) 我们也希望是简单的,第一个PR用于熟悉提交的流程和社区协作以及感受社区的友好度。

          社区汇总了以下适合新手的问题列表:https://github.com/apache/dolphinscheduler/issues/5689

          非新手问题列表:https://github.com/apache/dolphinscheduler/issues?
          q=is%3Aopen+is%3Aissue+label%3A%22volunteer+wanted%22

          如何参与贡献链接:https://dolphinscheduler.apache.org/zh-cn/community/development/contribute.html

          来吧,DolphinScheduler开源社区需要您的参与,为中国开源崛起添砖加瓦吧,哪怕只是小小的一块瓦,汇聚起来的力量也是巨大的。

          文章转载自海豚调度,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

          评论