暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

源码分析——服务发现组件Netflix Eureka

程序小寨 2019-04-17
227

前言说明环境准备架构图源码解析Eureka Server启动入口Resources APIRegister & Renew...RegistryrecentlyChangedQueue剔除缓存ReplicateEureka ClientproviderConsumer总结参考资料

前言

结合Netflix Eureka 架构图, 简单分析这一服务注册和服务发现组件的源码

目的

结合Netflix Eureka架构图

  • 从源码解读Eureka架构图中服务生命周期: Register, Renew, Cancel, etc.

  • 从源码解读Eureka Peer Replicate过程, 分析其CAP指标

说明

环境准备

源码获取: https://github.com/Netflix/eureka

项目结构如下:

eureka-client
eureka-client-archaius2
eureka-client-jersey2
eureka-core
eureka-core-jersey2
eureka-examples
eureka-resources
eureka-server
eureka-server-governator
eureka-test-utils
...

项目是纯Servlet应用, 使用Gradle构建, 这里我只是作为参照并没有构建, 在集成了Spring Cloud Netflix Eureka的项目中进行源码调试

  • Spring Cloud: Finchley.SR2

  • Spring Boot: 2.0.8.RELEASE

对应的Netflix Eureka版本为: v1.9.3

$ git checkout -b v1.9.3 v1.9.3

在Spring Cloud集成Netflix Eureka的代码中, 还能看到两个与Eureka相关的包

  • spring-cloud-netflix-eureka-server

  • spring-cloud-netflix-eureka-client

这部分代码被称为胶水代码, 包含一些原生代码针对Spring项目的支持, 一些默认配置等

以及我们最熟悉的 Eureka Dashboard 界面, 都是由这部分代码完成

架构图

引用官方wiki中的架构图

所以源码分析的角度就是上图中画直线的部分, 可以总结为两部分

  • Eureka Server之间的Replicate

  • Eureka Client 与 Eureka Server之间的Register, Renew, Cancel...

对于Eureka Client, 我们还应该区分这样两个逻辑角色

  • Service Provider: 服务提供者

  • Service Consumer: 服务消费者

无论客户端发现还是服务端发现, 服务发现的本质在于能够让服务之间发现彼此, 这是服务提供方与服务消费方完成消费的前提, 对应架构图中的Client端之间的Remote Call

源码解析

Eureka Server

启动入口

随着Eureka Server启动时的一行INFO日志Started Eureka Server

来到第一个入口点: EurekaServerInitializerConfiguration#start

@Configuration
public class EurekaServerInitializerConfiguration implements ServletContextAware,
SmartLifecycle, Ordered {
 // ...
 @Override
 public void start() {
   new Thread(new Runnable() {
     @Override
     public void run() {
       try {
         //TODO: is this class even needed now?          
         eurekaServerBootstrap.contextInitialized
          (EurekaServerInitializerConfiguration.this.servletContext);
log.info("Started Eureka Server");
         publish(new EurekaRegistryAvailableEvent(getEurekaServerConfig()));
         EurekaServerInitializerConfiguration.this.running = true;
         publish(new EurekaServerStartedEvent(getEurekaServerConfig()));
} catch (Exception ex) {
// Help!
         log.error("Could not initialize Eureka servlet context", ex);
}
}
}).start();
}
// ...
}

^_^注释很有意思

这里看到了一个启动引导类EurekaServerBootstrap

public void contextInitialized(ServletContext context) {
 try {
   initEurekaEnvironment();
   initEurekaServerContext();
   context.setAttribute(EurekaServerContext.class.getName(), this.serverContext);
} catch (Throwable e) {
   log.error("Cannot bootstrap eureka server :", e);
   throw new RuntimeException("Cannot bootstrap eureka server :", e);
}
}

同样在eureka-core中也存在一个引导类EurekaBootStrap
这是Eureka Server的启动类, 代码与EurekaServerBootstrap
基本类似

作用也基本相似, 都是完成eureka的初始化工作

不同的是EurekaBootStrap
实现了Servlet API中的ServletContextListener
接口

所以在容器启动时就会调用contextInitialized()
方法完成init过程, 以及在终止时的contextDestroyed()

在这个引导类中, 还能够看到两个与配置相关的接口:

  • EurekaServerConfig eurekaServerConfig;

  • EurekaClientConfig eurekaClientConfig;

没错, Eureka Server 和 Eureka Client的配置都在这里, 他们分别对应两个实现类:

  • DefaultEurekaServerConfig implements EurekaServerConfig

  • EurekaServerConfigBean implements EurekaServerConfig

一个是提供默认配置的类, 在netflix eureka源码中, 另一个在Spring Cloud Netflix的整合代码中, 是添加了@ConfigurationProperties
的属性配置类, 前缀为eureka.server

很熟悉吧? 对应于我们平时在配置文件中配置的eureka.server.xxx
.

Eureka Client端的EurekaClientConfig与之类似, 不说了

Resources API

从上面的架构图可以看到, Eureka Client需要向Eureka Server进行注册, 发送心跳更新租约, 获取注册信息等;

同时Eureka Server之间也要进行注册信息的同步

所以在Eureka Server中, 一定有对外提供REST API的入口, 这就是Resources中的内容, 你可以认为是控制器~

这部分代码在eureka-core中的com.netflix.eureka.resources
中:

  • ApplicationResource

  • InstanceResource

  • PeerReplicationResource

  • etc...

通过类名上的@Path注解, 或是类名. 不难分辨他们分别代表什么资源

Register & Renew...

来看一个服务注册-register的API, 在ApplicationResource

@POST
@Consumes({"application/json", "application/xml"})
public Response addInstance(InstanceInfo info,
                           @HeaderParam(PeerEurekaNode.HEADER_REPLICATION) String isReplication) {
   // validate required fields...
   // handle cases where clients may be registering with bad DataCenterInfo with missing data
   DataCenterInfo dataCenterInfo = info.getDataCenterInfo();
   if (dataCenterInfo instanceof UniqueIdentifier) {
       String dataCenterInfoId = ((UniqueIdentifier) dataCenterInfo).getId();
       if (isBlank(dataCenterInfoId)) {
           boolean experimental = "true".equalsIgnoreCase(serverConfig.getExperimental("registration.validation.dataCenterInfoId"));
           if (experimental) {
               String entity = "DataCenterInfo of type " + dataCenterInfo.getClass() + " must contain a valid id";
               return Response.status(400).entity(entity).build();
          } else if (dataCenterInfo instanceof AmazonInfo) {
               AmazonInfo amazonInfo = (AmazonInfo) dataCenterInfo;
               String effectiveId = amazonInfo.get(AmazonInfo.MetaDataKey.instanceId);
               if (effectiveId == null) {
                  amazonInfo.getMetadata().put(AmazonInfo.MetaDataKey.instanceId.getName(), info.getId());
              }
          } else {
               logger.warn("Registering DataCenterInfo of type {} without an appropriate id", dataCenterInfo.getClass());
          }
      }
  }
   registry.register(info, "true".equals(isReplication));
   return Response.status(204).build();  // 204 to be backwards compatible
}

通过查看这几个Resource类的API不难发现, 真正的逻辑都在一个PeerAwareInstanceRegistry registry
中完成

注意区分registry和register

来看下它的继承和实现关系

其中LeaseManager
中定义了实例注册(register), 续约(renew), 取消(cancel), 剔除(evict)的声明

public interface LeaseManager<T> {
   void register(T r, int leaseDuration, boolean isReplication);
   boolean cancel(String appName, String id, boolean isReplication);
   boolean renew(String appName, String id, boolean isReplication);
   void evict();
}

进入其中一个实现类, 样子是这样

@Override
public void register(final InstanceInfo info, final boolean isReplication) {
   int leaseDuration = Lease.DEFAULT_DURATION_IN_SECS; // 默认90s
   if (info.getLeaseInfo() != null && info.getLeaseInfo().getDurationInSecs() > 0) {
       leaseDuration = info.getLeaseInfo().getDurationInSecs();
  }
   super.register(info, leaseDuration, isReplication);
   replicateToPeers(Action.Register, info.getAppName(), info.getId(), info, null, isReplication);
}

这里做了两件事:

  1. 调用父类register方法, 传入服务注册所需要的信息(例如服务名, 实例ID等)

  2. 调用replicateToPeers方法, 拿到所有对等的Eureka Server节点的信息, 把实例的更改信息复制到各节点中, 也就是Peer Replicate过程

这里的leaseDuration对应配置项eureka.instance.lease-expiration-duration-in-seconds
, 默认90s

Registry的流程如下:

看到这里, 剩下的就是进入父类AbstractInstanceRegistry
去查看细节了, 而Renew, Cancel的过程基本都是如此, 不说了

下面对Eureka Server中的一些重点概念进行分析

Registry

在LeaseManager中的几个实现中, 都能看到这样一个数据结构

ConcurrentHashMap<String, Map<String, Lease<InstanceInfo>>> registry; 

如果能区分应用和实例的概念, 那就很好理解这个结构啦

翻译过来就是这样的: Map<应用名, Map<实例名, 实例的信息以及该实例续约相关的时间戳>>

举个例子, 向Eureka Server注册了两个服务server-a, server-b, 其中server-a有两个实例, 端口为8801, 8802

那么registry的结构为

{
 "SERVER-A": {
   "192.168.153.1:server-a:8802": Lease<InstanceInfo>,
   "192.168.153.1:server-a:8801": Lease<InstanceInfo>
},
 "SERVER-B": {
   "192.168.153.1:server-b:8901": Lease<InstanceInfo>
}
}

recentlyChangedQueue

ConcurrentLinkedQueue<RecentlyChangedItem> recentlyChangedQueue;

字面翻译是最近改变的队列, 在register, cancel, statusUpdate中, 都能看到对该队列添加了一个租约的变更记录

RecentlyChangedItem中存放了两个属性:

  • leaseInfo: 租约

  • lastUpdateTime: 上次更新时间

在register中, 还有一个定时任务, 会对该队列定期进行移除

Timer deltaRetentionTimer = new Timer("Eureka-DeltaRetentionTimer", true)

this.deltaRetentionTimer.schedule(getDeltaRetentionTask(),
               serverConfig.getDeltaRetentionTimerIntervalInMs(),
               serverConfig.getDeltaRetentionTimerIntervalInMs());

来看下具体要执行的任务是什么

private TimerTask getDeltaRetentionTask() {
   return new TimerTask() {
       @Override
       public void run() {
           Iterator<RecentlyChangedItem> it = recentlyChangedQueue.iterator();
           while (it.hasNext()) {
               if (it.next().getLastUpdateTime() <
                       System.currentTimeMillis() - serverConfig.getRetentionTimeInMSInDeltaQueue()) {
                   it.remove();
              } else {
                   break;
              }
          }
      }
  };
}

在EurekaServer的配置bean中可以找到上述代码中的两个配置的默认值

// 租约变更的过期时间
private long retentionTimeInMSInDeltaQueue = 3 * MINUTES;
// 移除租约变更记录的定时器的执行间隔时间
private long deltaRetentionTimerIntervalInMs = 30 * 1000;

关注if块中的移除条件就好, 也就是说会该任务每30秒执行一次, 移除最后更新时间距离现在超过180秒的租约记录

像不像一个距离为180s的滑动窗口?

剔除

与实例注册, 发送心跳不同的是实例的剔除是Eureka Server主动来做的, 定期剔除无效的服务

  • Server端定期执行剔除任务的默认周期为60s

    配置项: eureka.server.eviction-interval-timer-in-ms

  • 无效的服务是指未在指定时间内收到心跳(也就是未进行renew汇报的服务)

    默认的时间上限为90s, 配置项: eureka.instance.lease-expiration-duration-in-seconds

代码在这里AbstractInstanceRegistry#evict()
, 又是一个定时任务

这点从Eureka Server不停打出的INFO日志也能看出来

[a-EvictionTimer] c.n.e.registry.AbstractInstanceRegistry  : Running the evict task with compensationTime 0ms

定义如下:

private Timer evictionTimer = new Timer("Eureka-EvictionTimer", true);

缓存

ResponseCacheImpl是Eureka Server缓存的实现, 通过读写缓存来降低读写竞争, 增大并发

在服务注册, 下线, 状态改变和剔除失效服务中, 都能看到这样一个方法invalidateCache()
, 让什么缓存失效?

涉及到Eureka Server中两个很重要的缓存:

  • ResponseCacheImpl#readOnlyCacheMap

  • ResponseCacheImpl#readWriteCacheMap

从名字也能看出一个只读锁, 一个读写锁.

前者是ConcurrentMap, 后者是Guava Cache 它的写失效时间, Load方法都在ResponseCacheImpl的构造函数中定义了

而在invalidateCache()
中, 最终操作的都是readWriteCacheMap,

readOnlyCacheMap负责所有客户端读取实例信息的请求, 那么它的值从哪来, 我看到两种方式:

  • 方式一: getIfNotExist:

    ResponseCacheImpl#getValue
    中可以看到首先会从readOnlyCacheMap读, 如果不存在就从readWriteCacheMap拿, 然后放到readOnlyCacheMap中

  • 方式二: Timer

    ResponseCacheImpl#timer
    中定义了一个缓存填充的定时器, 在定时器的TaskResponseCacheImpl#getCacheUpdateTask
    中可以看到会将readWriteCacheMap的内容copy到readOnlyCacheMap

注意这两部分源码中都有一个if判断shouldUseReadOnlyResponseCache
, 对应配置项eureka.server.use-read-only-response-cache
, 默认为true, 字面翻译很清楚, 相当于开启只读缓存, 不开的话也就没有定时复制的任务了, 直接从读写缓存中拿了

Replicate

对应架构图中Eureka Server之间的Replicate

前面说过replicateToPeers()
的入口点, 这个过程实际是将instance修改信息添加到一个批量任务中打包发送给其他peer

源码参考: com.netflix.eureka.cluster.PeerEurekaNode

里面可以看到创建Batch Task的过程, 由于是异步, 所以并不能保证在服务实例状态发生梗概时, 所有Peer上的信息都一致.

Eureka Client

通过以上Eureka Server中的, 基本对架构图中的Register, Renew, Cancel过程有了基本的认识

再来简单说说Eureka Client

provider

对于Client端的Provider来说, 在启动和实例状态变化是, 需要通知Server端

源码参考(有序):

  • com.netflix.discovery.InstanceInfoReplicator#onDemandUpdate

  • com.netflix.discovery.DiscoveryClient#register

  • com.netflix.discovery.shared.transport.decorator.EurekaHttpClientDecorator#register

  • com.netflix.discovery.shared.transport.jersey.AbstractJerseyEurekaHttpClient#register

流程图如下:

同样, Renew, Cancel的过程也参照这个顺序

Consumer

对于Client端的Consumer来说, 需要拉取服务实例的列表并缓存, 还需要定期更新

流程如下图

总结

Netflix Eureka是典型的注册中心+嵌入式客户端架构, 并且各节点之间对等

通过上面的分析, 对Netflix Eureka架构图中的各个过程都有了一定的了解, 以及一些配置项的细节

能够发现的一点是, 无论Eureka Server之间的Replicate过程, 还是Eureka Client向Eureka Server发起Get Service Registries的过程

实现中看到了大量的schedule和异步过程, 以及Eureka的自我保护机制, 这种模型简化了集群管理的复杂度, 易于实现高可扩展性.

但是并不能完全保证强一致, 而是最终一致性, 这在很多业务场景下是能够满足的

这也是Eureka与其它几个服务发现组件(Zookeeper, Etcd, Consul)显著的区别

在CAP理论中, Eureka保证AP, 其它几个保证CP

参考资料

  • Eureka at a glance

  • Dive into Eureka


文章转载自程序小寨,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论