接上一章节,继续阅读Receive组件的代码,上一节讲到了在获取到要将哪些指标送给哪些endpoint处理,就会丢给fanoutForward()并行处理。
fanoutForward() pkg/receive/handler.go
首先新建一个带超时时间的context,默认的超时时间为5秒,可通过命令行参数 receive-forwad-timeout设置。

循环遍历上一章节讲解到的 wreqs。这里回忆一下,wreps是一个map,key为endpoint的地址,value为需要处理的指标数据。
wreqs map[string]*prompb.WriteRequest
遍历wreps的逻辑如下:
首先waitGroup增加1,所有的处理放在goroutine内处理,在后续阻塞函数return时用到。

然后判断不是通过其它Receive实例转发的请求
!replicas[endpoint].replicated
且对数据需要进行多副本备份
h.options.ReplicationFactor > 1
那么这些数据就需要转发给其他的Receive实例。进行转发备份的逻辑在replicate方法内处理。

如果匹配到需要转发的endpoint正好是当前Receive实例,直接写时序数据库,后续存到对象存储中。

relicate pkg/receive/handler.go
我们再回过头看下relicate方法的详细逻辑。
首先根据需要备份几份通过Hash.GetN获取每一份交给哪个Receive实例处理,并获取对应实例的endpoint。

然后将这些key为endpoint的map再次调用fanoutForward方法。这也就有了在fanoutForward中看到的如果匹配的endpoint是自己,则就写时序数据库。
此时再次调用fanoutForward时多了两个不同点。
replicated 已经是true了,说明是已经处理过的备份数据了
计算去了一个quoum,这个值表示当处理备份副本大于多少就表示成功了,函数可以退出了。其余还在携程中处理的备份数是否成功我也不再关注了。

可以看到只要大于 (n+1) / 2 +1 个备份成功就算这次数据存储成功了。
我们再回到fanoutForward方法。
此时 relicated是true了,那么第一个进行备份处理的判断是不会再进入了

第二个判断是有可能进来的,如果要处理备份的实例正好是自己本身,那么将会写入时序数据库。

最后启动一个Goroutine发起调用其它的Receive实例。在Goroutine中,首先获取这个endpooint对应发起请求的客户端对象。

通过对peers的跟踪,可以知道其实就是一个Grpc的client,在服务启动时已经与其它的实例建立了连接。这个过程在后续阅读Receive集群实现的代码部分再详细讲解。
同时还会判断其他的实例是否已经准备好。

最后调用GRPC接口RemoteWrite将请求发送到其他的Receive实例。
这里需要注意,如果调用请求失败了而且返回的错误码是Unavailable的,那么将会将该节点设置一段时间内无法再处理请求,过了这个时间之后才能处理。

在获取到这个实例的请求客户端对象时也会判断是否在这个时间内,如果是的 会返回失败。

当receive接口返回错误,客户端Prometheus后续会发起重试。在fanoutForward最后,可以看到有个for select在监听信号。

首先这个死循环会阻塞着fanoutForward函数退出。除非收到了两个信号。
超时信号,这个信号在上文中有说个默认值是5秒。
从ec channel里面读取信号。
这个ec channel是什么呢?我们回到fanoutForward函数最开始,可以看到申请了一个无缓冲的channel

而且在每个判断里面处理完成后都会向ec写入失败的error或者成功写入nil。如果nil累加达到可以返回的数量了 (n+1) / 2 +1 该函数也会退出,不再关注剩余的副本数是否处理完成。
RemoteWrite pkg/receive/handler.go
在Receive实例之间通过GRPC调用,最终会调用的服务端(被调用方)的RemoteWrite接口。

可以看到在RemoteWrite接口内,最终调用的是handleRequest,是不是很熟悉。在上一章节讲Receive接收http请求的时候也会调用到这个方法里。
所以我们前两章节讲的内容即包含http请求的部分,也包含各Receive实例之间的调用,这也是为什么处理请求的接口里面需要增加有 replicated字段。且对该字段的处理、判断逻辑还非常多。

在没有读到Grpc接口也走的handleRequest时,可能很难理解这里的replicated参数的含义。
到这里相信大家已经理解,其实就是:这份数据是否是需要进行备份处理的数据,还是初次通过RemoteWrite接口获取到的数据。
本系列回顾:




