暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

Grafana Tempo源码解读(二)Distributor对Trace数据的处理和发送至Ingester

栋总侃技术 2023-12-14
76

上一章节我们了解了Grafana Tempo的Distributor是如何接收数据然后扭转至Distributor.PushTraces方法的,而PushTraces方法是Distributor的处理核心代码逻辑实现,本节将带领大家对PushTraces方法的处理逻辑做详细解读。

下面是PushTraces方法的完整代码:

// PushTraces pushes a batch of traces
func (d *Distributor) PushTraces(ctx context.Context, traces ptrace.Traces) (*tempopb.PushResponse, error) {
  span, ctx := opentracing.StartSpanFromContext(ctx, "distributor.PushTraces")
  defer span.Finish()

  userID, spanCount, size, err := d.extractBasicInfo(ctx, traces)
  if err != nil {
    // can't record discarded spans here b/c there's no tenant
    return nil, err
  }
  if spanCount == 0 {
    return &tempopb.PushResponse{}, nil
  }
  // check limits
  err = d.checkForRateLimits(size, spanCount, userID)
  if err != nil {
    return nil, err
  }

  // Convert to bytes and back. This is unfortunate for efficiency, but it works
  // around the otel-collector internalization of otel-proto which Tempo also uses.
  convert, err := (&ptrace.ProtoMarshaler{}).MarshalTraces(traces)
  if err != nil {
    return nil, err
  }

  // tempopb.Trace is wire-compatible with ExportTraceServiceRequest
  // used by ToOtlpProtoBytes
  trace := tempopb.Trace{}
  err = trace.Unmarshal(convert)
  if err != nil {
    return nil, err
  }

  batches := trace.Batches

  if d.cfg.LogReceivedSpans.Enabled {
    logSpans(batches, &d.cfg.LogReceivedSpans, d.logger)
  }
  if d.cfg.MetricReceivedSpans.Enabled {
    metricSpans(batches, userID, &d.cfg.MetricReceivedSpans)
  }

  metricBytesIngested.WithLabelValues(userID).Add(float64(size))
  metricSpansIngested.WithLabelValues(userID).Add(float64(spanCount))

  keys, rebatchedTraces, err := requestsByTraceID(batches, userID, spanCount)
  if err != nil {
    overrides.RecordDiscardedSpans(spanCount, reasonInternalError, userID)
    return nil, err
  }

  err = d.sendToIngestersViaBytes(ctx, userID, rebatchedTraces, keys)
  if err != nil {
    recordDiscaredSpans(err, userID, spanCount)
    return nil, err
  }

  if len(d.overrides.MetricsGeneratorProcessors(userID)) > 0 {
    d.generatorForwarder.SendTraces(ctx, userID, keys, rebatchedTraces)
  }

  if err := d.forwardersManager.ForTenant(userID).ForwardTraces(ctx, traces); err != nil {
    _ = level.Warn(d.logger).Log("msg""failed to forward batches for tenant=%s: %w", userID, err)
  }

  return nilnil // PushRequest is ignored, so no reason to create one
}

通过函数的定义,可以看到请求参数为接收的Trace结构,返回参数为调用sendToIngestersViaBytes的返回结构。

PushTraces(ctx context.Contexttraces ptrace.Traces) (*tempopb.PushResponseerror)

读取租户信息

从请求中获取租户名称(上一节在初始化Distributor时注册的Middleware中写入orgIDContextKey中获取)。

从请求中获取当前接收的Trace包含的span数量如果数量为0则直接返回。同时还获取这一次请求的数据包大小,用来判断是否对Ingress限流处理。

限流

根据上一步中获取到的包大小进行限流判断,该函数将会根据ingestionRateLimiter配置判断是否触发限流。

在构建Distributor时会根据overrides的rate_stragy和rate_limit_bytes构建ingestionRateLimiter对象。

ingestionRateLimiter表示10秒内distributor接收的包超过多少则进行限流。

在官网配置的讲解中可以看到,rate_stragy确定限流配置是给distributor和ingestor共用,还是两者单独配置;rate_limit_bytes表示限流的包大小,默认为15MB

转化结构和记录日志

将Trace数据序列化为tempopb.Trace结构,并根据配置项log_received_spans、metric_received_spans 确定是否记录接收的Trace详细日志和记录Trace指标,这两个配置项一般用作debug排查错误,不建议在生产环境开启。

计算hashkey

将接收的Trace根据span、租户ID计算对应的hashkey,并返回各个span对应的hash值。

这里计算hashkey是通过hash一致性算法将span分别交给不同的ingestor实例进行处理,起到负载均衡的作用。关于hash一致性算法大家感兴趣的可以搜索学习,后续将会有单独的文章来讲解hash一致性算法。

将数据发送至Ingress

在sendToIngestersViaBytes 中通过ring.DoBatch 函数真正的处理发送,ring.DoBatch 根据hash一致性算法取出该key对应的span将发送在hash环上的哪个ingress实例,并通过grpc方法PushBytesV2发送给Ingress实例

将数据发送给ingress后再执行异步的forwarder操作

IngesterRing

在发送指标给Ingester时,IngesterRing是什么时候、怎么初始化的呢?

在前初始化module时,有讲到初始化对应module前将会初始化他所依赖的模块,而在初始化Distributor前就已经初始化好了IngesterRing模块。

该模块由配置文件ingester的lifecycler定义

以上为Distributor组件对Trace数据处理和发送至Ingester组件的代码讲解。下一节将带来数据发送至Ingeste组件后,Ingeste组件如何进行处理并写入存储。

往期回顾:

Grafana Tempo源码解读(一)Distributor建立监听接收Trace数据

文章转载自栋总侃技术,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论