原理
触发器监控工作流实例表,当工作流实例表中的状态更新后,针对状态为失败的任务进行企业微信告警。
发送企业微信消息函数
su - postgres# 必须在pg的主机上线安装requests模块pip install requests# 以postgres用户登陆psql客户端到etl数据库psql etl -U postgres# 创建插件plpython3ucreate extension plpython3u;# plpython3u为不受信语言,所以只能被超级用户使用# 在tool模式下建立发送企业微信消息函数tool.sp_send_wechatCREATE OR REPLACE FUNCTION tool.sp_send_wechat(message json, webhook character varying DEFAULT 'https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key=你自己的key'::character varying)RETURNS textLANGUAGE plpython3uSECURITY DEFINERAS $function$import requestsimport json"""/** 作者 : v-yuzhenc* 功能 : 给企业微信发送一条消息* message : 需要发送的消息,json格式* webhook : 企业微信机器人的webhook* */"""import requestsimport json# 企业微信自定义机器人的webhook地址p_webhook = webhook# 要发送的消息内容p_message = json.loads(message)# 发送POST请求response = requests.post(p_webhook, data=json.dumps(p_message), headers={"Content-Type": "application/json"})# 打印响应结果return response.text$function$;--将函数直接转给toolALTER FUNCTION tool.sp_send_wechat(json, varchar) OWNER TO tool;--公开函数的执行权限GRANT ALL ON FUNCTION tool.sp_send_wechat(json, varchar) TO public;--将函数的执行权限授权给tool用户GRANT ALL ON FUNCTION tool.sp_send_wechat(json, varchar) TO tool;\q
远程执行命令函数
• 由于海豚调度的任务日志是以文件的形式存储在操作系统中,所以,必须在数据库中实现这样一个函数,能够读取海豚服务器的日志文件。
su - postgres# 必须在pg的主机上安装paramiko模块pip install paramiko# 以postgres用户登陆psql客户端到etl数据库psql etl -U postgres# 上面已经创建了plpython3u插件,这里不需要再次建立了# 创建远程执行命令函数tool.sp_remote_exec_command_nopassCREATE OR REPLACE FUNCTION tool.sp_remote_exec_command_nopass(remote_command text, remote_host character varying DEFAULT 'dpmaster'::character varying, remote_port integer DEFAULT 22222, remote_username character varying DEFAULT 'dp'::character varying, remote_return_mode character varying DEFAULT 'stdout'::character varying)RETURNS textLANGUAGE plpython3uSECURITY DEFINERAS $function$import paramiko"""/** 作者 : v-yuzhenc* 功能 : 免密(需要配置ssh免密)在远程服务器执行一条命令* remote_command : 需要执行的命令* remote_host : 远程主机名或ip* remote_port : ssh端口* remote_username : 免密登陆的用户* remote_return_mode : 返回信息的模式,stderr返回标准错误信息,否则返回标准输出* */"""# SSH连接信息host = remote_hostport = remote_portusername = remote_usernameprivate_key_path = '/home/postgres/.ssh/id_rsa'ssh_command = remote_command# 连接SSH服务器ssh = paramiko.SSHClient()ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())private_key = paramiko.RSAKey.from_private_key_file(private_key_path)ssh.connect(host, port, username, pkey=private_key)# 通过SSH执行命令stdin, stdout, stderr = ssh.exec_command(ssh_command)p_stdout = stdout.read().decode().strip()p_stderr = stderr.read().decode().strip()# 关闭SSH连接ssh.close()# 打印响应结果if remote_return_mode == 'stderr':return None if p_stderr == '' else p_stderrelse:return None if p_stdout == '' else p_stdout$function$;-- PermissionsALTER FUNCTION tool.sp_remote_exec_command_nopass(text, varchar, int4, varchar, varchar) OWNER TO tool;GRANT ALL ON FUNCTION tool.sp_remote_exec_command_nopass(text, varchar, int4, varchar, varchar) TO tool;GRANT ALL ON FUNCTION tool.sp_remote_exec_command_nopass(text, varchar, int4, varchar, varchar) TO dp;\q
获取子工作流实例下所有子任务实例
su - postgrespsql etl -U dpCREATE OR REPLACE FUNCTION dp.get_subtaskins_all(dp_p_ins_id integer)RETURNS integer[]LANGUAGE plpgsqlSECURITY DEFINERAS $function$/** 作者 : v-yuzhenc* 功能 : 根据工作流实例id获取子工作流实例下所有子任务实例,返回int类型的数组* dp_p_ins_id : 工作流实例id* */declarep_process_instance_id int := dp_p_ins_id;v_result int[];beginwith recursive recursive_query as (-- 初始查询selectparent_process_instance_id,process_instance_id,parent_task_instance_idfrom t_ds_relation_process_instancewhere parent_process_instance_id = p_process_instance_idunion all-- 递归查询selectt.parent_process_instance_id,t.process_instance_id,t.parent_task_instance_idfrom t_ds_relation_process_instance tinner join recursive_query ron (t.parent_process_instance_id = r.process_instance_id)), tmp_b as (select parent_task_instance_id task_instance_id from recursive_query aunionselect b.id task_instance_idfrom recursive_query a, t_ds_task_instance bwhere a.process_instance_id = b.process_instance_id)select (string_to_array(string_agg(task_instance_id::text,','),','))::int[]into v_resultfrom tmp_b;return v_result;end;$function$;-- PermissionsALTER FUNCTION dp.get_subtaskins_all(int4) OWNER TO dp;GRANT ALL ON FUNCTION dp.get_subtaskins_all(int4) TO dp;\q
企业微信告警触发器
• 由于企业微信markdown格式的消息艾特指定的人只能通过企业微信中的userid(即用户在企业微信中的账号)调用,所以,我们在海豚调度的元数据表t_ds_user中增加wechat_userid字段,人工将海豚的用户对应的企业微信的userid维护上去
# 以dp用户登录etl数据库psql etl -U dp# 增加字段alter table t_ds_user add wechat_userid varchar(100);comment on column t_ds_user.wechat_userid is '对应的企业微信的userid';# 维护wechat_userid中的数据# 这里根据自己的企业实际情况做update t_ds_userset wechat_userid = 'YuZhenChao'where user_name = 'yuzhenchao';CREATE OR REPLACE FUNCTION dp.tg_ds_udef_alert_wechat()RETURNS triggerLANGUAGE plpgsqlAS $function$/** 作者:v-yuzhenc* 功能:海豚调度工作流失败自动告警* */declarei record;v_content text;v_message varchar;begincasewhen new.state = 6 thenbeginfor i in (select'<@'||coalesce(d.wechat_userid,'')||'>\r\n# [DolphinScheduler Job ]\r\n> 实例 id : ['||coalesce(a.id::varchar,'')||'/'||coalesce(b.id::varchar,'')||'](https://dolphin.tclpv.com/dolphinscheduler/ui/projects/'||coalesce(g.code::varchar,'')||'/workflow/instances/'||coalesce(a.id::varchar,'')||'?code='||coalesce(a.process_definition_code::varchar,'')||')\r\n> 项目名称 : <font color=\"comment\">'||coalesce(g.name,'')||'('||coalesce(g.code::varchar,'')||')</font>'||'\r\n> 工作流名 : <font color=\"comment\">'||coalesce(e.name,'')||'('||coalesce(a.process_definition_code::varchar,'')||')</font>'||'\r\n> 任务名称 : <font color=\"comment\">'||coalesce(b.name,'')||'('||coalesce(b.task_code::varchar,'')||')</font>'||'\r\n> 任务类型 : <font color=\"comment\">'||coalesce(b.task_type,'')||'</font>\r\n> 开始时间 : <font color=\"comment\">'||to_char(b.start_time,'yyyy-mm-dd hh24:mi:ss')||'</font>\r\n> 结束时间 : <font color=\"comment\">'||to_char(b.end_time,'yyyy-mm-dd hh24:mi:ss')||'</font>\r\n> 所属用户 : <font color=\"comment\">'||coalesce(d.user_name,'')||'('||coalesce(c.user_id::varchar,'')||')</font>\r\n> 任务状态 : <font color=\"warning\">Failed</font>'||'\r\n> 报错信息 : <font color=\"warning\">'||coalesce(tool.sp_remote_exec_command_nopass($remote_command$cat $remote_command$||b.log_path||$remote_command$ | grep "\[ERROR\]\|等表超时" | awk -F' - ' '{print $2!=null ? $2 : $1}' | head -1 | sed 's/\"/\\\"/g'$remote_command$,split_part(b.host,':',1)),'')||'</font>' as wechat_contentfrom t_ds_process_instance ainner join t_ds_task_instance bon (a.id = b.process_instance_id)inner join t_ds_task_definition con (b.task_code = c.code and b.task_definition_version = c."version")inner join t_ds_user don (c.user_id = d.id)inner join t_ds_process_definition eon (a.process_definition_code = e.code and a.process_definition_version = e."version")inner join t_ds_project gon (e.project_code = g.code)where c.task_type <> 'SUB_PROCESS'and a.state = 6and b.state = 6and a.id = new.id) loopv_content := i.wechat_content;v_message := $v_message${"msgtype":"markdown","markdown": {"content":"$v_message$||v_content||$v_message$"}}$v_message$;if v_message is not null then--告警perform tool.sp_send_wechat(v_message::json);end if;end loop;end;when new.state in (4,5) thenbegin--先告警select'# [DolphinScheduler Job ]\r\n> 实例 id : ['||coalesce(a.id::varchar,'')||'](https://dolphin.tclpv.com/dolphinscheduler/ui/projects/'||coalesce(g.code::varchar,'')||'/workflow/instances/'||coalesce(a.id::varchar,'')||'?code='||coalesce(a.process_definition_code::varchar,'')||')\r\n> 项目名称 : <font color=\"comment\">'||coalesce(g.name,'')||'('||coalesce(g.code::varchar,'')||')</font>'||'\r\n> 工作流名 : <font color=\"comment\">'||coalesce(e.name,'')||'('||coalesce(a.process_definition_code::varchar,'')||')</font>'||'\r\n> 开始时间 : <font color=\"comment\">'||to_char(a.start_time,'yyyy-mm-dd hh24:mi:ss')||'</font>\r\n> 结束时间 : <font color=\"comment\">'||to_char(coalesce(a.end_time,current_timestamp),'yyyy-mm-dd hh24:mi:ss')||'</font>'||'\r\n> 任务状态 : <font color=\"warning\">'||'Killed'||'</font>' as wechat_contentinto v_contentfrom t_ds_process_instance ainner join t_ds_process_definition eon (a.process_definition_code = e.code and a.process_definition_version = e."version")inner join t_ds_project gon (e.project_code = g.code)where a.state in (4,5)and a.id = new.id;v_message := $v_message${"msgtype":"markdown","markdown": {"content":"$v_message$||v_content||$v_message$"}}$v_message$;if v_message is not null then--告警perform tool.sp_send_wechat(v_message::json);end if;--先把工作流对应的任务实例置为失败update t_ds_task_instanceset state = 6where state not in (6,7)and id = any(get_subtaskins_all(new.id));--更新工作流状态为失败update t_ds_process_instanceset state = 6where id = new.id;end;elsenull;end case;return new;exception when others theninsert into t_ds_trigger_log (trigger_name,error_msg) values ('tg_ds_udef_alert_wechat',sqlerrm);return new;end;$function$;-- PermissionsALTER FUNCTION dp.tg_ds_udef_alert_wechat() OWNER TO dp;GRANT ALL ON FUNCTION dp.tg_ds_udef_alert_wechat() TO dp;# 创建时候触发器create trigger tg_state_ds_process_instance after update on dp.t_ds_process_instance for each row execute function dp.tg_ds_udef_alert_wechat();\q
测试
• 新建一个工作流,选择SQL组件

• 保存工作流

• 上线工作流并运行工作流

• 工作流运行失败

• 随即企业微信来了消息提醒


文章转载自海豚调度,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。




