暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

fastapi微服务系列(9)-之封装一个简单拦截器来处理GRPC整合Jaeger做链路追踪

小儿来一壶枸杞酒泡茶 2021-11-26
1710

上节回顾:

使用第三方库的的时候已经是到了崩溃的状态了!感觉太多的问题了~而且封装的复杂,不够浅显易懂。不得还是自己来搞一个自己懂的,且能自己维护的才可以!

1、需要理解知识点

下面的这几个点为封装的初步提供最关键一些信息点。也主要是对我们的追踪的一些必须理解的知识点。这里有必要的再补充下。

  • 创建Trace
from jaeger_client import Config

def init_jaeger_tracer(service_name='your-app-name'):
    config = Config(config={}, service_name=service_name)
    return config.initialize_tracer()

  • 创建和结束Span
// 开始无Parent的Span。
tracer.start_span('TestSpan'
// 开始有Parent的Span。
tracer.start_span('ChildSpan', child_of=span)
// 结束Span。
 span.finish()

  • 透传SpanContext
// 将spanContext透传到下一个Span中(序列化)。
tracer.inject(
        span_context=span.context, format=Format.TEXT_MAP, carrier=carrier
    )
// 解析透传过来的spanContxt(反序列化)。
span_ctx = tracer.extract(format=Format.TEXT_MAP, carrier={})

2、封装思路的回顾

2.1 服务端的封装思路

  • 1:定义一个全局tracer,可以考虑复制到我们的grpc-app全局的对象里面

  • 2:通过我们grpc自定义一个拦截器

  • 3:通过拦截器里面获取我们app里面定义全局的tracer对象

  • 4:通过判断是否传递有相关的metadata信息来进行判断是否需要extract来创建我们的(child_of)span_context对象--这步是处理跨服务的时候的关键的地方,需要反序列化!

  • 5:同事需要对异常捕获处理,记录一些管我们的tags信息,主要包括信息有:

    • ipv4_re里面的address和port
  • 6:然后开始创建start_span,并加入相关的span.log_kv

  • 7:在我们的中间件里面进的服务信息的传递,创建关于要记录的RpcInfo信息,主要包含有:

    • full_method
    • metadata
    • timeout
    • request
  • 8: 在响应处理的地方还可以记录相关的响应体的日志信息 span.log_kv({'response': response})

  • 9:如果是错误的话,还需要设置对应的 span.set_tag('error', True),然后记录我们的错误信息到  span.log_kv(error_log)

2.2 服务端的封装实践

其实原来也简单,主要是再拦截器的地方,进行我们的sapn反序列的处理。

完整代码:

#!/usr/bin/evn python
# -*- coding: utf-8 -*-
"""
-------------------------------------------------
   文件名称 :     ZyxOpentracing
   文件功能描述 :   功能描述
   创建人 :       小钟同学
   创建时间 :          2021/11/24
-------------------------------------------------
   修改描述-2021/11/24:         
-------------------------------------------------
"
""
from jaeger_client import Config
from grpc_interceptor import ServerInterceptor
from grpc_interceptor.exceptions import GrpcException
from grpc_interceptor.exceptions import NotFound
from typing import Any, Callable
from opentracing import tags
from opentracing.propagation import Format
import opentracing
import logging
import grpc
import re
from opentracing.ext import tags as ot_tags
from grpc_opentracing import grpcext, ActiveSpanSource

def grpc_opentracing_init():
    # 启动时候加入链路追踪的拦截器
    config = Config(
        # usually read from some yaml config---意思是配置信息其实可以从某个yaml文件进行读取
        config={
            # sampler 采样
            'sampler': {
                'type''const',  # 采样类型
                'param': 1,  # 采样开关 1:开启全部采样 0:关闭全部
            },
            # 配置链接到我们的本地的agent,通过agent来上报
            'local_agent': {
                # 注意这里是指定了JaegerAgent的host和port。
                # 根据官方建议为了保证数据可靠性,JaegerClient和JaegerAgent运行在同一台主机内,因此reporting_host填写为127.0.0.1。
                'reporting_host''127.0.0.1',
                'reporting_port': 6831,
            },
            'logging': True,
        },
        # 这里填写应用名称---服务的名称
        service_name="grpc_serve",
        validate=True
    )
    tracer = config.initialize_tracer()
    return tracer


def _add_peer_tags(peer_str, tags):
    # 主要是解析出来我们的地址信息
    ipv4_re = r"ipv4:(?P<address>.+):(?P<port>\d+)"
    match = re.match(ipv4_re, peer_str)
    if match:
        tags[ot_tags.PEER_HOST_IPV4] = match.group('address')
        tags[ot_tags.PEER_PORT] = match.group('port')
        return
    ipv6_re = r"ipv6:\[(?P<address>.+)\]:(?P<port>\d+)"
    match = re.match(ipv6_re, peer_str)
    if match:
        tags[ot_tags.PEER_HOST_IPV6] = match.group('address')
        tags[ot_tags.PEER_PORT] = match.group('port')
        return
    logging.warning('Unrecognized peer: \"%s\"', peer_str)




class Zyxopentracing(ServerInterceptor):

    def __init__(self, log_payloads=False):
        pass
        print("审核当回事当")
        self.tracer_interceptor = grpc_opentracing_init()
        # 是否 记录我们的request请求提交的payloads信息,请求参数信息
        self._log_payloads = log_payloads

    @staticmethod
    def _before_request_start_span(request, servicer_context, method_name, _tracer):
        """
        Gather various info about the request and start new span with the data.
        "
""
        # print("唉是滴哈是:开始----1", tracer)
        # span_context = tracer.extract(format=Format.HTTP_HEADERS, carrier=request.headers)

        span_context = None
        error = None
        # 获取到对应的客户端提交过来的请求头信息
        metadata = servicer_context.invocation_metadata()
        # 根据请求头信息,进行对应的解析透传过来的spanContxt(反序列化)。
        # 以下参考开源项目的处理方式
        try:
            if metadata:
                # HTTP_HEADERS是 http的透传传输的方式
                # span_context = self._tracer.extract(
                #     opentracing.Format.HTTP_HEADERS, dict(metadata))
                # TEXT_MAP 处理的是grapc的方式
                # 开始解析透传过来的spanContxt(反序列化
                span_context = _tracer.extract(opentracing.Format.TEXT_MAP, dict(metadata))
                # print("反编译过来!!!",span_context里面包含有相关的trace_id和span_id等信息,可以打印dir(span_context))
        except (opentracing.UnsupportedFormatException, opentracing.InvalidCarrierException, opentracing.SpanContextCorruptedException) as e:
            error = e
            logging.exception('tracer.extract() failed')
        # 开始处理扩展纤细tags的添加
        # metadatametadatametadata(('uber-trace-id', '3fec1eb736f1bff7:2a263288419df8a1:0:1'), )
        # value = '3fec1eb736f1bff7:2a263288419df8a1:0:1'
        peer_tags = {
            tags.COMPONENT: 'grpc',
            # 注意这个标记是服务端的
            tags.SPAN_KIND: tags.SPAN_KIND_RPC_SERVER
        }
        # 处理我们peer_tags,
        _add_peer_tags(servicer_context.peer(), peer_tags)
        # 创建对应的span
        span = _tracer.start_span(operation_name=method_name, child_of=span_context, tags=peer_tags)
        # 如果链路调用过程存在错误信息,则也需要记录到我们的时间里面,并记录错误信息
        if error is not None:
            span.set_tag('error', True)
            span.log_kv({'event''error''error.object': error})
        return span

    def intercept(self, method: Callable, request: Any, servicer_context: grpc.ServicerContext, method_name: str) -> Any:
        rsep = None
        # 使用上下文管理的方式处理我们的span
        with self._before_request_start_span(request, servicer_context, method_name, self.tracer_interceptor) as span:
            try:
                if self._log_payloads:
                    span.log_kv({'request': request})
                # 做一层转接的代理处理
                # servicer_context = _OpenTracingServicerContext(servicer_context, span)
                #
                rsep = method(request, servicer_context)
            except GrpcException as e:
                # 记录错误的信息
                span.set_tag('error', True)
                span.log_kv({'event''error''error.object': e})
                # 抛出错误信息
                servicer_context.set_code(e.status_code)
                servicer_context.set_details(e.details)
                raise
            finally:
                # 记录非grpc.StatusCode.OK的错误的响应的日志信息到追踪链路上
                # servicer_context 这里做了一层封装道理才可以获取到code,默认的是grpc.StatusCode.OK
                # if servicer_context.code != grpc.StatusCode.OK:
                #     print("奇葩了!!!!!!!")
                #     span.set_tag('error', True)
                #     error_log = {'event': 'error', 'error.kind': str(servicer_context.code)}
                #     if servicer_context.details is not None:
                #         error_log['message'] = servicer_context.details
                #     span.log_kv(error_log)
                return rsep


然后的使用的示例如下:

#!/usr/bin/evn python
# -*- coding: utf-8 -*-
import sys
from concurrent import futures
import time
import grpc
import hello_pb2
import hello_pb2_grpc
import signal
from typing import Any, Callable


# 实现 proto文件中定义的 GreeterServicer的接口
class Greeter(hello_pb2_grpc.GreeterServicer):
    # 实现 proto 文件中定义的 rpc 调用
    def SayHello(self, request, context):
        # 返回是我们的定义的响应体的对象
        return hello_pb2.HelloReply(message='hello {msg}'.format(msg=request.name))

    def SayHelloAgain(self, request, context):
        # 返回是我们的定义的响应体的对象

        # # 设置异常状态码
        # context.set_code(grpc.StatusCode.PERMISSION_DENIED)
        # context.set_details("你没有这个访问的权限")
        # raise context
        print("奇葩了!q1!contextcontext!!!!!", str(context.code))
        # 接收请求头的信息
        print("接收到的请求头元数据信息", context.invocation_metadata())
        # 设置响应报文头信息
        context.set_trailing_metadata((('name''223232'), ('sex''23232')))
        # 三种的压缩机制处理
        # NoCompression = _compression.NoCompression
        # Deflate = _compression.Deflate
        # Gzip = _compression.Gzip
        # 局部的数据进行压缩
        context.set_compression(grpc.Compression.Gzip)
        return hello_pb2.HelloReply(message='hello {msg}'.format(msg=request.name))


from grpc_interceptor import ServerInterceptor
from grpc_interceptor.exceptions import GrpcException
from grpc_interceptor.exceptions import NotFound
from ZyxServeTracingInterceptor import Zyxopentracing




def serve():



    compression = grpc.Compression.Gzip
    server = grpc.server(futures.ThreadPoolExecutor(max_workers=10),
                         compression=compression,
                         interceptors=[Zyxopentracing()])
    # 经过intercept_server再处理一
    # 添加我们服务
    hello_pb2_grpc.add_GreeterServicer_to_server(Greeter(), server)
    # 获取随机的端口:
    port = 45564
    print("启动端口!!!!",port)
    server.add_insecure_port(f'[::]:{port}')

    server.start()

    def stop_serve(signum, frame):
        print("进程结束了!!!!")
        # sys.exit(0)
        raise KeyboardInterrupt
    signal.signal(signal.SIGINT, stop_serve)
    server.wait_for_termination()


if __name__ == '__main__':
    serve()


其实和平时使用的拦截器一样:

2.2 客户端封装实践

客户端出现的问题有:

问题1:

WARNING:jaeger_tracing:Jaeger tracer already initialized, skipping
Traceback (most recent call last):
  File "D:/code/python/local_python/grpclearn/demo5/client_opentracing.py", line 133, in <module>
    run()
  File "D:/code/python/local_python/grpclearn/demo5/client_opentracing.py", line 114, in run
    timeout=5,
  File "C:\Users\mayn\.virtualenvs\grpclearn\lib\site-packages\grpc\_interceptor.py", line 271, in with_call
    compression=compression)
  File "C:\Users\mayn\.virtualenvs\grpclearn\lib\site-packages\grpc\_interceptor.py", line 257, in _with_call
    return call.result(), call
AttributeError: 'NoneType' object has no attribute 'result'

上面的问题是因为我重复的进行Jaeger tracer实例对象的重复初始化!!

问题2:出现了

jaeger invalid parent span IDs=xxxxx4; skipping clock skew adjustment

类似的问题。

这TM 有点离谱!!!!这个问题出现的原因是,如果我的只是执行了一次客户端提交!就退出的话,就会这样!!循环提交多次执行就不会!!!现在比较纳闷!!!

后来才知道!!!是因为我用完的时候没进行关闭!!我勒个去!!!

所有客户端的拦截器的完整代码如下:

#!/usr/bin/evn python
# -*- coding: utf-8 -*-
"""
-------------------------------------------------
   文件名称 :     ZyxOpentracing
   文件功能描述 :   功能描述
   创建人 :       小钟同学
   创建时间 :          2021/11/24
-------------------------------------------------
   修改描述-2021/11/24:         
-------------------------------------------------
"
""
from jaeger_client import Config
from grpc_interceptor import ServerInterceptor
from grpc_interceptor.exceptions import GrpcException
from grpc_interceptor.exceptions import NotFound
from typing import Any, Callable
# from opentracing.ext import tags as ot_tags
from opentracing.propagation import Format
import opentracing
import logging
import grpc
import re
from six import iteritems

from opentracing import tags as ot_tags





from grpc_interceptor import ClientCallDetails, ClientInterceptor


class ZyxCliopentracing(ClientInterceptor):

    def __init__(self, tracer_interceptor,active_span_source=None, log_payloads=False):
        pass
        print("审核当回事当")
        self.tracer_interceptor = tracer_interceptor
        # 是否 记录我们的request请求提交的payloads信息,请求参数信息
        self._log_payloads = log_payloads
        self._active_span_source = active_span_source

    def _before_request_start_span(self, method_name, parent=None):

        # client_tags = {
        #     tags.COMPONENT: 'grpc',
        #     # 注意这个标记是客户端的
        #     tags.SPAN_KIND: tags.SPAN_KIND_RPC_CLIENT
        # }
        tags = {
            ot_tags.COMPONENT: 'grpc',
            ot_tags.SPAN_KIND: ot_tags.SPAN_KIND_RPC_CLIENT
        }
        return self.tracer_interceptor.start_span(operation_name=method_name, child_of=parent, tags=tags)

        # return self.tracer_interceptor.start_span(operation_name=method_name, tags=client_tags, child_of=parent)

    def _inject_span_context(self, span, metadata):
        headers = {}
        try:
            # _inject--对我们的请求头数据进行将spanContext透传到下一个Span中(序列化)。
            self.tracer_interceptor.inject(span.context, opentracing.Format.TEXT_MAP, headers)
            # self.tracer_interceptor.inject(span.context, opentracing.Format.HTTP_HEADERS, headers)
        except (opentracing.UnsupportedFormatException,
                opentracing.InvalidCarrierException,
                opentracing.SpanContextCorruptedException) as e:
            logging.exception('tracer.inject() failed')
            span.set_tag('error', True)
            span.log_kv({'event''error''error.object': e})
            return metadata
        # 元数据信息进行转化
        metadata = () if metadata is None else tuple(metadata)
        return metadata + tuple((k.lower(), v) for (k, v) in iteritems(headers))

    def intercept(self, method: Callable, request_or_iterator: Any, call_details: grpc.ClientCallDetails) -> Any:
        if self.tracer_interceptor:
            with self._before_request_start_span(call_details.method) as span:
                # _inject--对我们的请求头数据进行将spanContext透传到下一个Span中(序列化)。
                metadata = self._inject_span_context(span, call_details.metadata)
                print("metadatametadatametadata", metadata)
                new_details = ClientCallDetails(
                    call_details.method,
                    call_details.timeout,
                    metadata,
                    call_details.credentials,
                    call_details.wait_for_ready,
                    call_details.compression,
                )
                return method(request_or_iterator, new_details)


使用测试代码:

#!/usr/bin/evn python
# -*- coding: utf-8 -*-


import grpc
import hello_pb2
import hello_pb2_grpc

 # 连接consul服务,作为dns服务器



from dns import resolver
from dns.exception import DNSException

# 创建一个consul dns查询的 resolver
consul_resolver = resolver.Resolver()
consul_resolver.port = 8600
consul_resolver.nameservers = ['127.0.0.1']


def get_ip_port(server_name):
    '''查询出可用的一个ip,和端口'''
    try:
        dnsanswer = consul_resolver.resolve(f'{server_name}.service.consul'"A")
        dnsanswer_srv = consul_resolver.resolve(f"{server_name}.service.consul""SRV")
    except DNSException:
        return None, None
    return dnsanswer[0].address, dnsanswer_srv[0].port

from jaeger_client import Config
from grpc_opentracing import open_tracing_client_interceptor
from grpc_opentracing.grpcext import intercept_channel
from demo5.ZyxClientTracingInterceptor import ZyxCliopentracing
def grpc_opentracing_init():
    # 启动时候加入链路追踪的拦截器
    config = Config(
        # usually read from some yaml config---意思是配置信息其实可以从某个yaml文件进行读取
        config={
            # sampler 采样
            'sampler': {
                'type''const',  # 采样类型
                'param': 1,  # 采样开关 1:开启全部采样 0:关闭全部
            },
            # 配置链接到我们的本地的agent,通过agent来上报
            'local_agent': {
                # 注意这里是指定了JaegerAgent的host和port。
                # 根据官方建议为了保证数据可靠性,JaegerClient和JaegerAgent运行在同一台主机内,因此reporting_host填写为127.0.0.1。
                'reporting_host''127.0.0.1',
                'reporting_port': 6831,
            },
            'logging': True,
        },
        # 这里填写应用名称---服务的名称
        service_name="grpc_client",
        validate=True
    )

    tracer = config.initialize_tracer()
    # 注意导入的是客户端的的拦截器的
    tracer_interceptor = open_tracing_client_interceptor(tracer)
    return tracer_interceptor,tracer

def grpc_opentracing_init():
    # 启动时候加入链路追踪的拦截器
    config = Config(
        # usually read from some yaml config---意思是配置信息其实可以从某个yaml文件进行读取
        config={
            # sampler 采样
            'sampler': {
                'type''const',  # 采样类型
                'param': 1,  # 采样开关 1:开启全部采样 0:关闭全部
            },
            # 配置链接到我们的本地的agent,通过agent来上报
            'local_agent': {
                # 注意这里是指定了JaegerAgent的host和port。
                # 根据官方建议为了保证数据可靠性,JaegerClient和JaegerAgent运行在同一台主机内,因此reporting_host填写为127.0.0.1。
                'reporting_host''127.0.0.1',
                'reporting_port': 6831,
            },
            'logging': True,
        },
        # 这里填写应用名称---服务的名称
        service_name="grpc_client",
        validate=True
    )
    tracer = config.initialize_tracer()
    return tracer


tracer_interceptor = grpc_opentracing_init()

def run():

    compression = grpc.Compression.Gzip


    # 添加客户端的请求拦截器
    # 单纯的子节点上传了,父节点没有上传
    interceptors = [ZyxCliopentracing(tracer_interceptor=tracer_interceptor)]
    with grpc.insecure_channel(target=f'127.0.0.1:45564',compression=compression) as channel:
        # 通过通道服务一个服务intercept_channel
        interceptor_channel = grpc.intercept_channel(channel,*interceptors)
        stub = hello_pb2_grpc.GreeterStub(interceptor_channel)
        # 生成请求我们的服务的函数的时候,需要传递的参数体,它放在hello_pb2里面-请求体为:hello_pb2.HelloRequest对象
        try:

            # reest_header = (
            #     ('mesasge', '1010'),
            #     ('error', 'No Error')
            # )
            response, callbask = stub.SayHelloAgain.with_call(request=hello_pb2.HelloRequest(name='欢迎下次光临'),
                                                              # 设置请求的超时处理
                                                              timeout=5,
                                                              # 设置请求的头的信息
                                                              # metadata=reest_header,
                                                              )
            print("SayHelloAgain函数调用结果的返回: " + response.message)
            print("SayHelloAgain函数调用结果的返回---响应报文头信息: ", callbask.trailing_metadata())
        except grpc._channel._InactiveRpcError as e:
            print(e.code())
            print(e.details())
        finally:
            # 关闭追踪
            pass
            time.sleep(2)
            tracer_interceptor.close()
            time.sleep(3)


if __name__ == '__main__':
    import time
    for _ in range(1):
        time.sleep(1)
        run()


然后最后测试验证查看链路:

完美的实现了!

查看输出信息:客户端:

审核当回事当
metadatametadatametadata (('uber-trace-id''fba029d81d1b0226:b058e6ffc5614c5e:0:1'),)
SayHelloAgain函数调用结果的返回: hello 欢迎下次光临
SayHelloAgain函数调用结果的返回---响应报文头信息:  (_Metadatum(key='name', value='223232'), _Metadatum(key='sex', value='23232'))

服务端:

审核当回事当
启动端口!!!!45564
奇葩了!q1!contextcontext!!!!! <bound method _Context.code of <grpc._server._Context object at 0x000002552EA8C588>>
接收到的请求头元数据信息 (_Metadatum(key='uber-trace-id', value='fba029d81d1b0226:b058e6ffc5614c5e:0:1'), _Metadatum(key='user-agent', value='grpc-python/1.41.1 grpc-c/19.0.0 (windows; chttp2)'))

注意识相点有:

  • 客户端不能重复实例化
  • 发送客户端的 ot_tags.SPAN_KIND_RPC_CLIENT 是客户端类型
  • 建议使用下面的opentracing.Format.TEXT_MAP格式的

总结

以上仅仅是个人结合自己的实际需求,做学习的实践笔记!如有笔误!欢迎批评指正!感谢各位大佬!

结尾

END

简书:https://www.jianshu.com/u/d6960089b087

掘金:https://juejin.cn/user/2963939079225608

公众号:微信搜【小儿来一壶枸杞酒泡茶】

小钟同学 | 文 【欢迎一起学习交流】| QQ:308711822


文章转载自小儿来一壶枸杞酒泡茶,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论