暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

Prometheus监控Flink Metric实战详解及源码改造

大数据从业者 2022-09-18
5137

Prometheus架构说明

    Prometheus Server:负责监控数据的获取、存储、查询。
    exporter :提供http协议的采集服务;Prometheus Server访问 exporter以采集数据。
    Pushgateway:当Prometheus以 pull方式采集数据需要Prometheus Server与exporter 网络可达。而网络不可达的场景或者以push模式采集数据的场景,则使用 Pushgateway 进行中转(即内部网络主动push到Pushgateway,而prometheus 以pull方式从 pushgateway拉取数据。
    AlertManager:提供告警服务,支持基于 PromQL 创建告警规则,如果满足定义的规则,则产生告警信息,进入 AlertManager 进行处理,支持:邮件、微信、webhook 自定义报警等等。

    Prometheus安装部署

    官方安装包下载地址:

      https://prometheus.io/download/

      修改prometheus.yml配置:

      修改alertmanager.yml配置:

      设置告警规则:

      expr说明:

        基于job_name分组,1小时内flink_taskmanager_job_task_numRecordsInPerSecond + flink_taskmanager_job_task_numRecordsInPerSecond的总和等于0则告警。

        启动Promethues

          nohup ./prometheus --web.listen-address=":9090" --config.file=prometheus.yml --web.enable-admin-api > prometheus.log 2>&1 &

          启动Pushgateway

            nohup ./pushgateway --web.listen-address=":9091" > pushgateway.log 2>&1 &

            启动alertmanager

              nohup ./alertmanager --config.file=alertmanager.yml > app/log/alertmanager.log 2>&1 &

              Flink配置Prometheus参数

              Flink On Yarn多作业场景时,只能使用pushgateway模式。官网相关内容地址如下:

                https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/deployment/metric_reporters/#prometheuspushgateway

                配置示例:

                  metrics.reporter.promgateway.class: org.apache.flink.metrics.prometheus.PrometheusPushGatewayReporter
                  metrics.reporter.promgateway.host: felixzh
                  metrics.reporter.promgateway.port: 9091
                  metrics.reporter.promgateway.jobName: myJob
                  metrics.reporter.promgateway.randomJobNameSuffix: true
                  metrics.reporter.promgateway.deleteOnShutdown: false
                  #metrics.reporter.promgateway.groupingKey: k1=v1;k2=v2
                  metrics.reporter.promgateway.interval: 60 SECONDS

                  拷贝相关jar

                    [root@felixzh flink-1.13.6]# cp plugins/metrics-prometheus/flink-metrics-prometheus-1.13.6.jar lib/

                    Flink示例作业

                      [root@felixzh conf]# nc -l 4444
                      [root@felixzh bin]# ./flink run -t yarn-per-job ../examples/streaming/SocketWindowWordCount.jar --hostname felixzh --port 4444

                      Pushgateway演示

                      浏览器访问Pushgateway,http://felixzh:9091,可以看到Flink metric数据已经成功上报。

                      Promethues演示

                      浏览器访问Promethues,http://felixzh:9090,可以看到Promethues已经从Pushgateway成功拉取到上报的Flink metric数据。

                      源码改造

                      细心的朋友应该发现一个问题:Flink metric数据上报到Pushgateway时,metrics.reporter.promgateway.jobName是配置文件设置的固定值+一个随机字符串。实际生产使用时,让用户保证每次提交作业时单独定制jobName与实际作业一一对应,对应很不现实,很容易出现Flink metric数据混乱。

                      解决思路:如果不想造成混乱,则需要找到一个唯一识别Flink作业的标识、并且自动设置为jobName。可以想到Flink作业提交到yarn集群所生成的applicationID可以唯一标识。

                      那么,如何将applicationID动态设置为jobName呢?

                      Flink HA配置参数high-availability.cluster-id的描述信息提供了一个思路。

                      运行在yarn模式和mesos模式时候,该参数无需设置,Flink会自行填充。而填充的内容其实就是applicationID。通过Debug Flink源码可以证明这一点,详细的Debug Flink源码的方法参见之前的文章。

                      下图可以看到:

                        high-availability.cluster-id= application_1659165319025_0027

                        解决方法:

                          high-availability.cluster-id值设置成metrics.reporter.promgateway.jobName

                          源码修改github地址如下:

                            https://github.com/felixzh2020/flink/commit/38d7bb4c85480fd182ee6808896727564606615c
                              //---------------add by felixzh start-------------------
                              //yarn mode, the default set (metrics.reporter.promgateway.jobName: myJob) leads to prometheus can't distinguish jobs.
                              //Dynamic assignment is applicationId
                              final String JOB_NAME_DEFAULT = "myJob";
                              final String METRICS_REPORTER_PROMGATEWAY_JOBNAME = "metrics.reporter.promgateway.jobName";
                              ConfigOptionmetricsReporterPromgatewayJobName = key(METRICS_REPORTER_PROMGATEWAY_JOBNAME)
                              .stringType()
                              .defaultValue(JOB_NAME_DEFAULT);
                              if(configuration.getValue(metricsReporterPromgatewayJobName).equals(JOB_NAME_DEFAULT)){
                              String applicationId = configuration.getString(HighAvailabilityOptions.HA_CLUSTER_ID);
                              configuration.setString("metrics.reporter.promgateway.jobName", applicationId);
                              }
                              //---------------add by felixzh send-------------------

                              测试效果

                              提交一个Flink Socket流作业

                                 nc –l 4444
                                ./bin/flink run -t yarn-per-job examples/streaming/SocketWindowWordCount.jar --hostname 10.121.198.220 --port 4444

                                Pushgateway上报监控数据job名称已更新为applicationID

                                文章转载自大数据从业者,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

                                评论