随着分布式集群规模的扩大,以及系统复杂度的增加,单纯靠人力监控几乎不能满足需求,必须有一套监控体系能时刻监控系统的动态,最大程度地放大低概率发生的系统故障,才能使维护人员及时发现并维护,从而最小程度的减少业务损失。Flink作为当下最流行的实时分布式计算系统,不同的业务可能需要关注不同的指标,有时候需要自定义指标才能满足需求。我们可以通过分析Flink的分布式监控框架,掌握其脉络,从而达到自定义监控指标的目的。Flink提供了4类指标监控,分别是:
Gauge :用来监控一个指标的瞬时值。比如某一瞬间TaskManager的内存使用。
Histogram:用来对指标进行统计展示。比如某个指标在一段时间内的max、min值等。
Meter:用来监控一个指标某时间段的平均值。比如task任务每秒接收的记录数。
Counter:用来统计一个指标的总量。比如task接收记录的总数、发送记录的总数等。
为了方便对单个指标的管理,Flink提供了指标组MetricGroup概念。
Flink监控架构如下:

上图主要是task相关的指标架构,所以只是画了TaskMamagerRunner中启动的MetricQueryService 。Flink有2种获取监控指标的方式,一个是pull,一个是push。
其中MetricRegistry是指标注册中心,与MetricQueryService和MetricReport交互,主要是实现添加指标、删除指标动作。
MetricQueryService 是一个独立的服务,用于实现rest api拉取系统监控指标。在TaskMamager节点上有TaskMamagerRunner负责启动,如WebMonitorEndpoint是Flink UI的具体实现类,其会定期向MetricQueryService发出拉起task相关指标的请求并展示。
MetricReporter 也是一个单独的程序,其会主动push指标到某个外部系统。图中是以prometheus为例。
下面通过源码分析。分为二部分讲解,指标更新和指标获取。
1、指标如何更新。


通过读取配置实例化MetricRegistryImpl注册中心,report相关配置入口是metrics.reporters,本例中reporter是prometheus;然后需要定义每个reporter相关的配置,包括入口类class,定期report 的时间间隔interval等。随后MetricRegistryImpl实例化一个MetricsQueryService并启动,其底层是基于AKKA实现,能够响应如下事件。addMetric、removeMetric、queryMetrics方法。

上面的注册的metric最终会和task任务相关联,由于内容较多,暂不枚举,读者可跟踪代码调试。我们以Count指标为例看下其数据是如何更新的。

在StreamTask中会根据是本地任务还是远程任务创建localInputGate和RemoteInputGate,InputGatewayWithMetrics是一个带Metrics的类。其pollNext方法用于获取下一条数据,inputGate.getNext会获取数据并处理,结束之后会调用updatemetrics函数进行numBytesIn的指标更新,numBytesIn是一个Counter类型,完成对当前task接收的数据进行计数。可以看出这只是多了一次计算,对性能的影响是很小的。
2.1、通过rest拉取指标获取


上面是MetricQueryService的添加删除指标和查询指标方法定义,其根据类别分别添加到不同的指标列表中。

当MetricsQueryService启动后,就可以通过rest API调用queryMetrics方法获取Counter的值。具体可参考WebMonitorEndpoint的fetcher实现自定义拉取。
2.2 通过report推送指标


看下MetricReporter接口的定义,主要有4个接口,每一个自定义的report都要实现这4个接口。其中open用于和外部系统建立连接,close是关闭连接,剩下的2 个是添加删除指标。本例以流行的prometheus为例分析。由于prometheus不属于flink系统,所以AbstractPrometheusReporter继承自MetricReporter,实现notifyOfAddedMetric和notifyOfRemovedMetric接口。

PrometheusPushGatewayReporter实现了open和close方法。在open方法里通过host和Port实例化prometheus的PushGateway对象。在report方法中调用pushGateway进行推送。

下面看下reprort是如何启动的。跳到MetricRegisterImpl类的初始化方法中。


在MetricRegisterImpl初始化的时候会读取配置获取reporter,并判断是否继承里Scheduled类,进行以固定间隔调用reporter.report汇报指标。MetricRegisterImpl的register对应添加指标,会调用reporter的notifyOfAddedMetric方法,unregistered对于删除指标,会调用reporter的notifyOfRemovedMetric方法。
Flink的metricSystem框架设计的非常巧妙,既可以通过pull的方法拉取指标,也可以push方式推送到自定义的系统,且线程拆分的很轻,对性能影响很小。




