Prometheus架构说明

Prometheus Server:负责监控数据的获取、存储、查询。exporter :提供http协议的采集服务;Prometheus Server访问 exporter以采集数据。Pushgateway:当Prometheus以 pull方式采集数据需要Prometheus Server与exporter 网络可达。而网络不可达的场景或者以push模式采集数据的场景,则使用 Pushgateway 进行中转(即内部网络主动push到Pushgateway,而prometheus 以pull方式从 pushgateway拉取数据。AlertManager:提供告警服务,支持基于 PromQL 创建告警规则,如果满足定义的规则,则产生告警信息,进入 AlertManager 进行处理,支持:邮件、微信、webhook 自定义报警等等。
Prometheus安装部署
官方安装包下载地址:
https://prometheus.io/download/



修改prometheus.yml配置:

修改alertmanager.yml配置:

设置告警规则:

expr说明:
基于job_name分组,1小时内flink_taskmanager_job_task_numRecordsInPerSecond + flink_taskmanager_job_task_numRecordsInPerSecond的总和等于0则告警。
启动Promethues
nohup ./prometheus --web.listen-address=":9090" --config.file=prometheus.yml --web.enable-admin-api > prometheus.log 2>&1 &
启动Pushgateway
nohup ./pushgateway --web.listen-address=":9091" > pushgateway.log 2>&1 &
启动alertmanager
nohup ./alertmanager --config.file=alertmanager.yml > app/log/alertmanager.log 2>&1 &
Flink配置Prometheus参数
Flink On Yarn多作业场景时,只能使用pushgateway模式。官网相关内容地址如下:
https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/deployment/metric_reporters/#prometheuspushgateway
配置示例:
metrics.reporter.promgateway.class: org.apache.flink.metrics.prometheus.PrometheusPushGatewayReportermetrics.reporter.promgateway.host: felixzhmetrics.reporter.promgateway.port: 9091metrics.reporter.promgateway.jobName: myJobmetrics.reporter.promgateway.randomJobNameSuffix: truemetrics.reporter.promgateway.deleteOnShutdown: false#metrics.reporter.promgateway.groupingKey: k1=v1;k2=v2metrics.reporter.promgateway.interval: 60 SECONDS

拷贝相关jar
[root@felixzh flink-1.13.6]# cp plugins/metrics-prometheus/flink-metrics-prometheus-1.13.6.jar lib/
Flink示例作业
[root@felixzh conf]# nc -l 4444[root@felixzh bin]# ./flink run -t yarn-per-job ../examples/streaming/SocketWindowWordCount.jar --hostname felixzh --port 4444

Pushgateway演示
浏览器访问Pushgateway,http://felixzh:9091,可以看到Flink metric数据已经成功上报。

Promethues演示
浏览器访问Promethues,http://felixzh:9090,可以看到Promethues已经从Pushgateway成功拉取到上报的Flink metric数据。

源码改造
细心的朋友应该发现一个问题:Flink metric数据上报到Pushgateway时,metrics.reporter.promgateway.jobName是配置文件设置的固定值+一个随机字符串。实际生产使用时,让用户保证每次提交作业时单独定制jobName与实际作业一一对应,对应很不现实,很容易出现Flink metric数据混乱。
解决思路:如果不想造成混乱,则需要找到一个唯一识别Flink作业的标识、并且自动设置为jobName。可以想到Flink作业提交到yarn集群所生成的applicationID可以唯一标识。
那么,如何将applicationID动态设置为jobName呢?
Flink HA配置参数high-availability.cluster-id的描述信息提供了一个思路。

运行在yarn模式和mesos模式时候,该参数无需设置,Flink会自行填充。而填充的内容其实就是applicationID。通过Debug Flink源码可以证明这一点,详细的Debug Flink源码的方法参见之前的文章。
下图可以看到:
high-availability.cluster-id= application_1659165319025_0027

解决方法:
将high-availability.cluster-id值设置成metrics.reporter.promgateway.jobName。
源码修改github地址如下:
https://github.com/felixzh2020/flink/commit/38d7bb4c85480fd182ee6808896727564606615c
//---------------add by felixzh start-------------------//yarn mode, the default set (metrics.reporter.promgateway.jobName: myJob) leads to prometheus can't distinguish jobs.//Dynamic assignment is applicationIdfinal String JOB_NAME_DEFAULT = "myJob";final String METRICS_REPORTER_PROMGATEWAY_JOBNAME = "metrics.reporter.promgateway.jobName";ConfigOptionmetricsReporterPromgatewayJobName = key(METRICS_REPORTER_PROMGATEWAY_JOBNAME).stringType().defaultValue(JOB_NAME_DEFAULT);if(configuration.getValue(metricsReporterPromgatewayJobName).equals(JOB_NAME_DEFAULT)){String applicationId = configuration.getString(HighAvailabilityOptions.HA_CLUSTER_ID);configuration.setString("metrics.reporter.promgateway.jobName", applicationId);}//---------------add by felixzh send-------------------
测试效果
提交一个Flink Socket流作业
nc –l 4444./bin/flink run -t yarn-per-job examples/streaming/SocketWindowWordCount.jar --hostname 10.121.198.220 --port 4444

Pushgateway上报监控数据job名称已更新为applicationID





