上一章节我们了解了Grafana Tempo的Distributor是如何接收数据然后扭转至Distributor.PushTraces方法的,而PushTraces方法是Distributor的处理核心代码逻辑实现,本节将带领大家对PushTraces方法的处理逻辑做详细解读。
下面是PushTraces方法的完整代码:
// PushTraces pushes a batch of traces
func (d *Distributor) PushTraces(ctx context.Context, traces ptrace.Traces) (*tempopb.PushResponse, error) {
span, ctx := opentracing.StartSpanFromContext(ctx, "distributor.PushTraces")
defer span.Finish()
userID, spanCount, size, err := d.extractBasicInfo(ctx, traces)
if err != nil {
// can't record discarded spans here b/c there's no tenant
return nil, err
}
if spanCount == 0 {
return &tempopb.PushResponse{}, nil
}
// check limits
err = d.checkForRateLimits(size, spanCount, userID)
if err != nil {
return nil, err
}
// Convert to bytes and back. This is unfortunate for efficiency, but it works
// around the otel-collector internalization of otel-proto which Tempo also uses.
convert, err := (&ptrace.ProtoMarshaler{}).MarshalTraces(traces)
if err != nil {
return nil, err
}
// tempopb.Trace is wire-compatible with ExportTraceServiceRequest
// used by ToOtlpProtoBytes
trace := tempopb.Trace{}
err = trace.Unmarshal(convert)
if err != nil {
return nil, err
}
batches := trace.Batches
if d.cfg.LogReceivedSpans.Enabled {
logSpans(batches, &d.cfg.LogReceivedSpans, d.logger)
}
if d.cfg.MetricReceivedSpans.Enabled {
metricSpans(batches, userID, &d.cfg.MetricReceivedSpans)
}
metricBytesIngested.WithLabelValues(userID).Add(float64(size))
metricSpansIngested.WithLabelValues(userID).Add(float64(spanCount))
keys, rebatchedTraces, err := requestsByTraceID(batches, userID, spanCount)
if err != nil {
overrides.RecordDiscardedSpans(spanCount, reasonInternalError, userID)
return nil, err
}
err = d.sendToIngestersViaBytes(ctx, userID, rebatchedTraces, keys)
if err != nil {
recordDiscaredSpans(err, userID, spanCount)
return nil, err
}
if len(d.overrides.MetricsGeneratorProcessors(userID)) > 0 {
d.generatorForwarder.SendTraces(ctx, userID, keys, rebatchedTraces)
}
if err := d.forwardersManager.ForTenant(userID).ForwardTraces(ctx, traces); err != nil {
_ = level.Warn(d.logger).Log("msg", "failed to forward batches for tenant=%s: %w", userID, err)
}
return nil, nil // PushRequest is ignored, so no reason to create one
}
通过函数的定义,可以看到请求参数为接收的Trace结构,返回参数为调用sendToIngestersViaBytes的返回结构。
PushTraces(ctx context.Context, traces ptrace.Traces) (*tempopb.PushResponse, error)
读取租户信息
从请求中获取租户名称(上一节在初始化Distributor时注册的Middleware中写入orgIDContextKey中获取)。
从请求中获取当前接收的Trace包含的span数量如果数量为0则直接返回。同时还获取这一次请求的数据包大小,用来判断是否对Ingress限流处理。

限流
根据上一步中获取到的包大小进行限流判断,该函数将会根据ingestionRateLimiter配置判断是否触发限流。

在构建Distributor时会根据overrides的rate_stragy和rate_limit_bytes构建ingestionRateLimiter对象。

ingestionRateLimiter表示10秒内distributor接收的包超过多少则进行限流。

在官网配置的讲解中可以看到,rate_stragy确定限流配置是给distributor和ingestor共用,还是两者单独配置;rate_limit_bytes表示限流的包大小,默认为15MB

转化结构和记录日志
将Trace数据序列化为tempopb.Trace结构,并根据配置项log_received_spans、metric_received_spans 确定是否记录接收的Trace详细日志和记录Trace指标,这两个配置项一般用作debug排查错误,不建议在生产环境开启。

计算hashkey
将接收的Trace根据span、租户ID计算对应的hashkey,并返回各个span对应的hash值。
这里计算hashkey是通过hash一致性算法将span分别交给不同的ingestor实例进行处理,起到负载均衡的作用。关于hash一致性算法大家感兴趣的可以搜索学习,后续将会有单独的文章来讲解hash一致性算法。

将数据发送至Ingress
在sendToIngestersViaBytes 中通过ring.DoBatch 函数真正的处理发送,ring.DoBatch 根据hash一致性算法取出该key对应的span将发送在hash环上的哪个ingress实例,并通过grpc方法PushBytesV2发送给Ingress实例

将数据发送给ingress后再执行异步的forwarder操作

IngesterRing
在发送指标给Ingester时,IngesterRing是什么时候、怎么初始化的呢?
在前初始化module时,有讲到初始化对应module前将会初始化他所依赖的模块,而在初始化Distributor前就已经初始化好了IngesterRing模块。

该模块由配置文件ingester的lifecycler定义

以上为Distributor组件对Trace数据处理和发送至Ingester组件的代码讲解。下一节将带来数据发送至Ingeste组件后,Ingeste组件如何进行处理并写入存储。
往期回顾:
Grafana Tempo源码解读(一)Distributor建立监听接收Trace数据




