上节回顾:
使用第三方库的的时候已经是到了崩溃的状态了!感觉太多的问题了~而且封装的复杂,不够浅显易懂。不得还是自己来搞一个自己懂的,且能自己维护的才可以!
1、需要理解知识点
下面的这几个点为封装的初步提供最关键一些信息点。也主要是对我们的追踪的一些必须理解的知识点。这里有必要的再补充下。
创建Trace
from jaeger_client import Config
def init_jaeger_tracer(service_name='your-app-name'):
config = Config(config={}, service_name=service_name)
return config.initialize_tracer()
创建和结束Span
// 开始无Parent的Span。
tracer.start_span('TestSpan')
// 开始有Parent的Span。
tracer.start_span('ChildSpan', child_of=span)
// 结束Span。
span.finish()
透传SpanContext
// 将spanContext透传到下一个Span中(序列化)。
tracer.inject(
span_context=span.context, format=Format.TEXT_MAP, carrier=carrier
)
// 解析透传过来的spanContxt(反序列化)。
span_ctx = tracer.extract(format=Format.TEXT_MAP, carrier={})
2、封装思路的回顾
2.1 服务端的封装思路
1:定义一个全局tracer,可以考虑复制到我们的grpc-app全局的对象里面
2:通过我们grpc自定义一个拦截器
3:通过拦截器里面获取我们app里面定义全局的tracer对象
4:通过判断是否传递有相关的metadata信息来进行判断是否需要extract来创建我们的(child_of)span_context对象--这步是处理跨服务的时候的关键的地方,需要反序列化!
5:同事需要对异常捕获处理,记录一些管我们的tags信息,主要包括信息有:
ipv4_re里面的address和port 6:然后开始创建start_span,并加入相关的span.log_kv
7:在我们的中间件里面进的服务信息的传递,创建关于要记录的RpcInfo信息,主要包含有:
full_method metadata timeout request 8: 在响应处理的地方还可以记录相关的响应体的日志信息 span.log_kv({'response': response})
9:如果是错误的话,还需要设置对应的 span.set_tag('error', True),然后记录我们的错误信息到 span.log_kv(error_log)
2.2 服务端的封装实践
其实原来也简单,主要是再拦截器的地方,进行我们的sapn反序列的处理。
完整代码:
#!/usr/bin/evn python
# -*- coding: utf-8 -*-
"""
-------------------------------------------------
文件名称 : ZyxOpentracing
文件功能描述 : 功能描述
创建人 : 小钟同学
创建时间 : 2021/11/24
-------------------------------------------------
修改描述-2021/11/24:
-------------------------------------------------
"""
from jaeger_client import Config
from grpc_interceptor import ServerInterceptor
from grpc_interceptor.exceptions import GrpcException
from grpc_interceptor.exceptions import NotFound
from typing import Any, Callable
from opentracing import tags
from opentracing.propagation import Format
import opentracing
import logging
import grpc
import re
from opentracing.ext import tags as ot_tags
from grpc_opentracing import grpcext, ActiveSpanSource
def grpc_opentracing_init():
# 启动时候加入链路追踪的拦截器
config = Config(
# usually read from some yaml config---意思是配置信息其实可以从某个yaml文件进行读取
config={
# sampler 采样
'sampler': {
'type': 'const', # 采样类型
'param': 1, # 采样开关 1:开启全部采样 0:关闭全部
},
# 配置链接到我们的本地的agent,通过agent来上报
'local_agent': {
# 注意这里是指定了JaegerAgent的host和port。
# 根据官方建议为了保证数据可靠性,JaegerClient和JaegerAgent运行在同一台主机内,因此reporting_host填写为127.0.0.1。
'reporting_host': '127.0.0.1',
'reporting_port': 6831,
},
'logging': True,
},
# 这里填写应用名称---服务的名称
service_name="grpc_serve",
validate=True
)
tracer = config.initialize_tracer()
return tracer
def _add_peer_tags(peer_str, tags):
# 主要是解析出来我们的地址信息
ipv4_re = r"ipv4:(?P<address>.+):(?P<port>\d+)"
match = re.match(ipv4_re, peer_str)
if match:
tags[ot_tags.PEER_HOST_IPV4] = match.group('address')
tags[ot_tags.PEER_PORT] = match.group('port')
return
ipv6_re = r"ipv6:\[(?P<address>.+)\]:(?P<port>\d+)"
match = re.match(ipv6_re, peer_str)
if match:
tags[ot_tags.PEER_HOST_IPV6] = match.group('address')
tags[ot_tags.PEER_PORT] = match.group('port')
return
logging.warning('Unrecognized peer: \"%s\"', peer_str)
class Zyxopentracing(ServerInterceptor):
def __init__(self, log_payloads=False):
pass
print("审核当回事当")
self.tracer_interceptor = grpc_opentracing_init()
# 是否 记录我们的request请求提交的payloads信息,请求参数信息
self._log_payloads = log_payloads
@staticmethod
def _before_request_start_span(request, servicer_context, method_name, _tracer):
"""
Gather various info about the request and start new span with the data.
"""
# print("唉是滴哈是:开始----1", tracer)
# span_context = tracer.extract(format=Format.HTTP_HEADERS, carrier=request.headers)
span_context = None
error = None
# 获取到对应的客户端提交过来的请求头信息
metadata = servicer_context.invocation_metadata()
# 根据请求头信息,进行对应的解析透传过来的spanContxt(反序列化)。
# 以下参考开源项目的处理方式
try:
if metadata:
# HTTP_HEADERS是 http的透传传输的方式
# span_context = self._tracer.extract(
# opentracing.Format.HTTP_HEADERS, dict(metadata))
# TEXT_MAP 处理的是grapc的方式
# 开始解析透传过来的spanContxt(反序列化
span_context = _tracer.extract(opentracing.Format.TEXT_MAP, dict(metadata))
# print("反编译过来!!!",span_context里面包含有相关的trace_id和span_id等信息,可以打印dir(span_context))
except (opentracing.UnsupportedFormatException, opentracing.InvalidCarrierException, opentracing.SpanContextCorruptedException) as e:
error = e
logging.exception('tracer.extract() failed')
# 开始处理扩展纤细tags的添加
# metadatametadatametadata(('uber-trace-id', '3fec1eb736f1bff7:2a263288419df8a1:0:1'), )
# value = '3fec1eb736f1bff7:2a263288419df8a1:0:1'
peer_tags = {
tags.COMPONENT: 'grpc',
# 注意这个标记是服务端的
tags.SPAN_KIND: tags.SPAN_KIND_RPC_SERVER
}
# 处理我们peer_tags,
_add_peer_tags(servicer_context.peer(), peer_tags)
# 创建对应的span
span = _tracer.start_span(operation_name=method_name, child_of=span_context, tags=peer_tags)
# 如果链路调用过程存在错误信息,则也需要记录到我们的时间里面,并记录错误信息
if error is not None:
span.set_tag('error', True)
span.log_kv({'event': 'error', 'error.object': error})
return span
def intercept(self, method: Callable, request: Any, servicer_context: grpc.ServicerContext, method_name: str) -> Any:
rsep = None
# 使用上下文管理的方式处理我们的span
with self._before_request_start_span(request, servicer_context, method_name, self.tracer_interceptor) as span:
try:
if self._log_payloads:
span.log_kv({'request': request})
# 做一层转接的代理处理
# servicer_context = _OpenTracingServicerContext(servicer_context, span)
#
rsep = method(request, servicer_context)
except GrpcException as e:
# 记录错误的信息
span.set_tag('error', True)
span.log_kv({'event': 'error', 'error.object': e})
# 抛出错误信息
servicer_context.set_code(e.status_code)
servicer_context.set_details(e.details)
raise
finally:
# 记录非grpc.StatusCode.OK的错误的响应的日志信息到追踪链路上
# servicer_context 这里做了一层封装道理才可以获取到code,默认的是grpc.StatusCode.OK
# if servicer_context.code != grpc.StatusCode.OK:
# print("奇葩了!!!!!!!")
# span.set_tag('error', True)
# error_log = {'event': 'error', 'error.kind': str(servicer_context.code)}
# if servicer_context.details is not None:
# error_log['message'] = servicer_context.details
# span.log_kv(error_log)
return rsep
然后的使用的示例如下:
#!/usr/bin/evn python
# -*- coding: utf-8 -*-
import sys
from concurrent import futures
import time
import grpc
import hello_pb2
import hello_pb2_grpc
import signal
from typing import Any, Callable
# 实现 proto文件中定义的 GreeterServicer的接口
class Greeter(hello_pb2_grpc.GreeterServicer):
# 实现 proto 文件中定义的 rpc 调用
def SayHello(self, request, context):
# 返回是我们的定义的响应体的对象
return hello_pb2.HelloReply(message='hello {msg}'.format(msg=request.name))
def SayHelloAgain(self, request, context):
# 返回是我们的定义的响应体的对象
# # 设置异常状态码
# context.set_code(grpc.StatusCode.PERMISSION_DENIED)
# context.set_details("你没有这个访问的权限")
# raise context
print("奇葩了!q1!contextcontext!!!!!", str(context.code))
# 接收请求头的信息
print("接收到的请求头元数据信息", context.invocation_metadata())
# 设置响应报文头信息
context.set_trailing_metadata((('name', '223232'), ('sex', '23232')))
# 三种的压缩机制处理
# NoCompression = _compression.NoCompression
# Deflate = _compression.Deflate
# Gzip = _compression.Gzip
# 局部的数据进行压缩
context.set_compression(grpc.Compression.Gzip)
return hello_pb2.HelloReply(message='hello {msg}'.format(msg=request.name))
from grpc_interceptor import ServerInterceptor
from grpc_interceptor.exceptions import GrpcException
from grpc_interceptor.exceptions import NotFound
from ZyxServeTracingInterceptor import Zyxopentracing
def serve():
compression = grpc.Compression.Gzip
server = grpc.server(futures.ThreadPoolExecutor(max_workers=10),
compression=compression,
interceptors=[Zyxopentracing()])
# 经过intercept_server再处理一
# 添加我们服务
hello_pb2_grpc.add_GreeterServicer_to_server(Greeter(), server)
# 获取随机的端口:
port = 45564
print("启动端口!!!!",port)
server.add_insecure_port(f'[::]:{port}')
server.start()
def stop_serve(signum, frame):
print("进程结束了!!!!")
# sys.exit(0)
raise KeyboardInterrupt
signal.signal(signal.SIGINT, stop_serve)
server.wait_for_termination()
if __name__ == '__main__':
serve()
其实和平时使用的拦截器一样:

2.2 客户端封装实践
客户端出现的问题有:
问题1:
WARNING:jaeger_tracing:Jaeger tracer already initialized, skipping
Traceback (most recent call last):
File "D:/code/python/local_python/grpclearn/demo5/client_opentracing.py", line 133, in <module>
run()
File "D:/code/python/local_python/grpclearn/demo5/client_opentracing.py", line 114, in run
timeout=5,
File "C:\Users\mayn\.virtualenvs\grpclearn\lib\site-packages\grpc\_interceptor.py", line 271, in with_call
compression=compression)
File "C:\Users\mayn\.virtualenvs\grpclearn\lib\site-packages\grpc\_interceptor.py", line 257, in _with_call
return call.result(), call
AttributeError: 'NoneType' object has no attribute 'result'
上面的问题是因为我重复的进行Jaeger tracer实例对象的重复初始化!!
问题2:出现了
jaeger invalid parent span IDs=xxxxx4; skipping clock skew adjustment
类似的问题。
这TM 有点离谱!!!!这个问题出现的原因是,如果我的只是执行了一次客户端提交!就退出的话,就会这样!!循环提交多次执行就不会!!!现在比较纳闷!!!
后来才知道!!!是因为我用完的时候没进行关闭!!我勒个去!!!

所有客户端的拦截器的完整代码如下:
#!/usr/bin/evn python
# -*- coding: utf-8 -*-
"""
-------------------------------------------------
文件名称 : ZyxOpentracing
文件功能描述 : 功能描述
创建人 : 小钟同学
创建时间 : 2021/11/24
-------------------------------------------------
修改描述-2021/11/24:
-------------------------------------------------
"""
from jaeger_client import Config
from grpc_interceptor import ServerInterceptor
from grpc_interceptor.exceptions import GrpcException
from grpc_interceptor.exceptions import NotFound
from typing import Any, Callable
# from opentracing.ext import tags as ot_tags
from opentracing.propagation import Format
import opentracing
import logging
import grpc
import re
from six import iteritems
from opentracing import tags as ot_tags
from grpc_interceptor import ClientCallDetails, ClientInterceptor
class ZyxCliopentracing(ClientInterceptor):
def __init__(self, tracer_interceptor,active_span_source=None, log_payloads=False):
pass
print("审核当回事当")
self.tracer_interceptor = tracer_interceptor
# 是否 记录我们的request请求提交的payloads信息,请求参数信息
self._log_payloads = log_payloads
self._active_span_source = active_span_source
def _before_request_start_span(self, method_name, parent=None):
# client_tags = {
# tags.COMPONENT: 'grpc',
# # 注意这个标记是客户端的
# tags.SPAN_KIND: tags.SPAN_KIND_RPC_CLIENT
# }
tags = {
ot_tags.COMPONENT: 'grpc',
ot_tags.SPAN_KIND: ot_tags.SPAN_KIND_RPC_CLIENT
}
return self.tracer_interceptor.start_span(operation_name=method_name, child_of=parent, tags=tags)
# return self.tracer_interceptor.start_span(operation_name=method_name, tags=client_tags, child_of=parent)
def _inject_span_context(self, span, metadata):
headers = {}
try:
# _inject--对我们的请求头数据进行将spanContext透传到下一个Span中(序列化)。
self.tracer_interceptor.inject(span.context, opentracing.Format.TEXT_MAP, headers)
# self.tracer_interceptor.inject(span.context, opentracing.Format.HTTP_HEADERS, headers)
except (opentracing.UnsupportedFormatException,
opentracing.InvalidCarrierException,
opentracing.SpanContextCorruptedException) as e:
logging.exception('tracer.inject() failed')
span.set_tag('error', True)
span.log_kv({'event': 'error', 'error.object': e})
return metadata
# 元数据信息进行转化
metadata = () if metadata is None else tuple(metadata)
return metadata + tuple((k.lower(), v) for (k, v) in iteritems(headers))
def intercept(self, method: Callable, request_or_iterator: Any, call_details: grpc.ClientCallDetails) -> Any:
if self.tracer_interceptor:
with self._before_request_start_span(call_details.method) as span:
# _inject--对我们的请求头数据进行将spanContext透传到下一个Span中(序列化)。
metadata = self._inject_span_context(span, call_details.metadata)
print("metadatametadatametadata", metadata)
new_details = ClientCallDetails(
call_details.method,
call_details.timeout,
metadata,
call_details.credentials,
call_details.wait_for_ready,
call_details.compression,
)
return method(request_or_iterator, new_details)
使用测试代码:
#!/usr/bin/evn python
# -*- coding: utf-8 -*-
import grpc
import hello_pb2
import hello_pb2_grpc
# 连接consul服务,作为dns服务器
from dns import resolver
from dns.exception import DNSException
# 创建一个consul dns查询的 resolver
consul_resolver = resolver.Resolver()
consul_resolver.port = 8600
consul_resolver.nameservers = ['127.0.0.1']
def get_ip_port(server_name):
'''查询出可用的一个ip,和端口'''
try:
dnsanswer = consul_resolver.resolve(f'{server_name}.service.consul', "A")
dnsanswer_srv = consul_resolver.resolve(f"{server_name}.service.consul", "SRV")
except DNSException:
return None, None
return dnsanswer[0].address, dnsanswer_srv[0].port
from jaeger_client import Config
from grpc_opentracing import open_tracing_client_interceptor
from grpc_opentracing.grpcext import intercept_channel
from demo5.ZyxClientTracingInterceptor import ZyxCliopentracing
def grpc_opentracing_init():
# 启动时候加入链路追踪的拦截器
config = Config(
# usually read from some yaml config---意思是配置信息其实可以从某个yaml文件进行读取
config={
# sampler 采样
'sampler': {
'type': 'const', # 采样类型
'param': 1, # 采样开关 1:开启全部采样 0:关闭全部
},
# 配置链接到我们的本地的agent,通过agent来上报
'local_agent': {
# 注意这里是指定了JaegerAgent的host和port。
# 根据官方建议为了保证数据可靠性,JaegerClient和JaegerAgent运行在同一台主机内,因此reporting_host填写为127.0.0.1。
'reporting_host': '127.0.0.1',
'reporting_port': 6831,
},
'logging': True,
},
# 这里填写应用名称---服务的名称
service_name="grpc_client",
validate=True
)
tracer = config.initialize_tracer()
# 注意导入的是客户端的的拦截器的
tracer_interceptor = open_tracing_client_interceptor(tracer)
return tracer_interceptor,tracer
def grpc_opentracing_init():
# 启动时候加入链路追踪的拦截器
config = Config(
# usually read from some yaml config---意思是配置信息其实可以从某个yaml文件进行读取
config={
# sampler 采样
'sampler': {
'type': 'const', # 采样类型
'param': 1, # 采样开关 1:开启全部采样 0:关闭全部
},
# 配置链接到我们的本地的agent,通过agent来上报
'local_agent': {
# 注意这里是指定了JaegerAgent的host和port。
# 根据官方建议为了保证数据可靠性,JaegerClient和JaegerAgent运行在同一台主机内,因此reporting_host填写为127.0.0.1。
'reporting_host': '127.0.0.1',
'reporting_port': 6831,
},
'logging': True,
},
# 这里填写应用名称---服务的名称
service_name="grpc_client",
validate=True
)
tracer = config.initialize_tracer()
return tracer
tracer_interceptor = grpc_opentracing_init()
def run():
compression = grpc.Compression.Gzip
# 添加客户端的请求拦截器
# 单纯的子节点上传了,父节点没有上传
interceptors = [ZyxCliopentracing(tracer_interceptor=tracer_interceptor)]
with grpc.insecure_channel(target=f'127.0.0.1:45564',compression=compression) as channel:
# 通过通道服务一个服务intercept_channel
interceptor_channel = grpc.intercept_channel(channel,*interceptors)
stub = hello_pb2_grpc.GreeterStub(interceptor_channel)
# 生成请求我们的服务的函数的时候,需要传递的参数体,它放在hello_pb2里面-请求体为:hello_pb2.HelloRequest对象
try:
# reest_header = (
# ('mesasge', '1010'),
# ('error', 'No Error')
# )
response, callbask = stub.SayHelloAgain.with_call(request=hello_pb2.HelloRequest(name='欢迎下次光临'),
# 设置请求的超时处理
timeout=5,
# 设置请求的头的信息
# metadata=reest_header,
)
print("SayHelloAgain函数调用结果的返回: " + response.message)
print("SayHelloAgain函数调用结果的返回---响应报文头信息: ", callbask.trailing_metadata())
except grpc._channel._InactiveRpcError as e:
print(e.code())
print(e.details())
finally:
# 关闭追踪
pass
time.sleep(2)
tracer_interceptor.close()
time.sleep(3)
if __name__ == '__main__':
import time
for _ in range(1):
time.sleep(1)
run()
然后最后测试验证查看链路:

完美的实现了!
查看输出信息:客户端:
审核当回事当
metadatametadatametadata (('uber-trace-id', 'fba029d81d1b0226:b058e6ffc5614c5e:0:1'),)
SayHelloAgain函数调用结果的返回: hello 欢迎下次光临
SayHelloAgain函数调用结果的返回---响应报文头信息: (_Metadatum(key='name', value='223232'), _Metadatum(key='sex', value='23232'))

服务端:
审核当回事当
启动端口!!!!45564
奇葩了!q1!contextcontext!!!!! <bound method _Context.code of <grpc._server._Context object at 0x000002552EA8C588>>
接收到的请求头元数据信息 (_Metadatum(key='uber-trace-id', value='fba029d81d1b0226:b058e6ffc5614c5e:0:1'), _Metadatum(key='user-agent', value='grpc-python/1.41.1 grpc-c/19.0.0 (windows; chttp2)'))

注意识相点有:
客户端不能重复实例化 发送客户端的 ot_tags.SPAN_KIND_RPC_CLIENT 是客户端类型 建议使用下面的opentracing.Format.TEXT_MAP格式的

总结
以上仅仅是个人结合自己的实际需求,做学习的实践笔记!如有笔误!欢迎批评指正!感谢各位大佬!
结尾
END
简书:https://www.jianshu.com/u/d6960089b087
掘金:https://juejin.cn/user/2963939079225608
公众号:微信搜【小儿来一壶枸杞酒泡茶】
小钟同学 | 文 【欢迎一起学习交流】| QQ:308711822




