暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

Kafka MirrorMakerV2源码解读

大数据从业者 2021-10-19
2603

前言

前面已经梳理MirrorMakerV2(以下简称MM2)概念及实践,可以直接到文章合集-Kafka中查阅。本文主要深入MM2源码,介绍MM2的启动流程。

 

启动脚本剖析

启动脚本为connect-mirror-maker.sh,可以通过KAFKA_LOG4J_OPTS指定log4j配置文件,可以通过KAFKA_HEAP_OPTS指定堆内存(默认为-Xms256M –Xmx2G),具体类路径为:

    org.apache.kafka.connect.mirror.MirrorMaker

     

    MirrorMaker类剖析

    main方法首先通过argparse4j(第三方实现的解析命令行的工具)解析命令行参数,返回一个Namespace包装类。

      ArgumentParser parser = ArgumentParsers.newArgumentParser("connect-mirror-maker");
      parser.description("MirrorMaker 2.0 driver");
      parser.addArgument("config").type(Arguments.fileType().verifyCanRead())
      .metavar("mm2.properties").required(true)
      .help("MM2 configuration file.");
      parser.addArgument("--clusters").nargs("+").metavar("CLUSTER").required(false)
      .help("Target cluster to use for this node.");
      Namespace ns;
      try {
      ns = parser.parseArgs(args);
      } catch (ArgumentParserException e) {
      parser.handleError(e);
      Exit.exit(-1);
      return;
      }

      题外话:这个Namespace类设计的很方便。该类实际包装一个Map(存储解析出来的命令行参数KV),通过一个get()方法返回泛型类结果,然后getString()、getByte()、getShort()、getInt()、getLong()、getFloat()、getDouble()、getBoolean()、getList()等方法调用get()方法获取指定数据类型的返回结果。这样的设计很方便获取各种类型的命令行参数值,完全可以复用在别的日常开发中。Namespace具体类结构如图所示:


       

      MirrorMaker类实例化方法

      继续正题,解析connect-mirror-maker.sh命令行参数可以获取到配置文件和clusters参数。然后就可以实例化MirrorMaker类。

        MirrorMaker mirrorMaker = new MirrorMaker(config, clusters, Time.SYSTEM);

        该类的构造方法中用一个Set<SourceAndTarget>类型herderPairs变量存放配置文件中配置的replicaiton flow流向(如:x-y或者y->x)。默认情况下,即使x->y.enabled=false,x->y也会被存放到herderPairs内;除非emit.heartbeats.enabled=false或者x-y.emit.heartbeats.enabled=false时,x-y才不会被存放到herderPairs内。这么做的原因是:

        对于开启emit.hearts的replicationflow(如A->B),是需要两个herders的。一个是A->B属于真实的replicaiton flow同步topic数据,即MirrorSourceConnector;另一个是B->A属于用于监控replicaiton flow健康状态的提交heartbeats到A的MirrorHeartbeatConnector。

          log.info("Targeting clusters {}", this.clusters);
          this.herderPairs = config.clusterPairs().stream()
          .filter(x -> this.clusters.contains(x.target()))
              .collect(Collectors.toSet());

          除了上述规则,如果命令行参数显式设置了clusters参数,herderPairs内存储的SourceAndTarget pairs还需要满足target cluster属于clusters参数的子集。这么做的原因是考虑MirrorMaker优先运行在target cluster原则:该原则主要是考虑异常情况下,避免无效读写请求。

          然后,遍历herderPairs变量,每个变量(SourceAndTarget)均执行addHerder()方法。AddHerder()方法主要功能:实例化一个Kafka Connect框架Wroker类,该Worker类会运行多个线程执行多个tasks(即读Kafka或者写Kafka)。

            Worker worker = new Worker(workerId, time, plugins, distributedConfig, offsetBackingStore, CLIENT_CONFIG_OVERRIDE_POLICY);

            再基于Worker对象,实例化Herder接口类,该接口有两个实现类:StandaloneHerder、DistributedHerder。MirrorMaker类使用的是DistributedHerder。

              Herder herder = new DistributedHerder(distributedConfig, time, worker,
              kafkaClusterId, statusBackingStore, configBackingStore,
                      advertisedUrl, CLIENT_CONFIG_OVERRIDE_POLICY, sharedAdmin);

              然后,将每一个SourceAndTarget和对应的herder,存储到Map<SourceAndTarget, Herder>类型的herders变量中。

                private final Map<SourceAndTarget, Herder> herders = new HashMap<>();
                herders.put(sourceAndTarget, herder);

                关于Herder接口和两个实现类:

                  Herder接口用于跟踪和管理workers和connectors。
                  StandaloneHerder属于单进程,使用基于内存的MemoryStatusBackingStore和MemoryConfigBackingStore,用于standalone模式Kafka Connect进程。
                  DistributedHerder使用基于Kafka的KafkaStatusBackingStore和KafkaConfigBackingStore,在多个进程之间协调workers,底层基于Kafka group membership实现group managed;每个加入group的DistributedHerder实例,上报它的configuration state。Group协调器给每个实例分配一些connectors和tasks执行,分配策略采用简单的round-ronbin方式。但这也不是绝对的,为了避免start/stop花销,herder也会采用sticky分配策略。DistributedHerder实例只运行分配给它的connectors和tasks。

                  补充下几个概念:

                    Connectors:the high level abstraction that coordinates data streaming by managing tasks
                    Tasks:the implementation of how data is copied to or from Kafka
                    Workers:the running processes that execute connectors and tasks

                    至此,MirrorMaker构造方法结束,MirrorMaker实例化完成。

                     

                    MirrorMaker类start方法

                    我们知道,上述构造方法中得到Map<SourceAndTarget,Herder>类型的herders变量。这里重新遍历下,调用每个Herder实例的start方法(即DistributedHerder类的start方法)。

                      for (Herder herder : herders.values()) {
                          try {
                              herder.start();
                          } finally {
                              startLatch.countDown();
                          }
                      }

                      DistributedHerder类实例化时,实例化了一个线程数为1的herderExecutor线程池。

                        this.herderExecutor = new ThreadPoolExecutor(1, 1, 0L,
                        TimeUnit.MILLISECONDS,
                        new LinkedBlockingDeque<Runnable>(1),
                        ThreadUtils.createThreadFactory(
                                        this.getClass().getSimpleName() + "-" + clientId + "-%d"false));

                        DistributedHerder类实现了Runnable接口,其start方法将自身提交到上述线程池运行。

                          @Override
                          public void start() {
                          this.herderExecutor.submit(this);
                          }

                          DistributedHerder类run方法通过调用startServices()方法以启动worker、statusBackingStore、configBackingStore服务。

                            protected void startServices() {
                            this.worker.start();
                            this.statusBackingStore.start();
                            this.configBackingStore.start();
                            }

                            最后,MirrorMaker类start()方法会遍历Set<SourceAndTarget>类型的herderPairs变量,每一个变量执行configureConnectors()方法,该方法最终调用DistributedHerder类的putConnectorConfig方法。

                              herderPairs.forEach(x -> configureConnectors(x));

                              putConnectorConfig将具体的SourceAndTarget所对应的replication flow任务提交到上述已经完成启动的DistributedHerder各种服务中。


                              至此,MIrrorMakerV2.0启动完成!!

                              文章转载自大数据从业者,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

                              评论