0

Kafka Connect介绍以及在分布式部署模式下启动流程分析

民生运维人 2020-10-12
1990

1. 什么是Kafka Connect?

Kafka Connect, an open source component of Apache Kafka®, is a framework for connecting Kafka with external systems such as databases, key-value stores, search indexes, and file systems.

Kafka Connect是Apache Kafka®的一个开源组件,是一个用于将Kafka与外部系统(如数据库、键值存储、搜索索引和文件系统)连接的框架。Kafka Connnect有两个核心概念:Source和Sink。Source负责导入数据到Kafka,Sink负责从Kafka导出数据,它们都被称为Connector。

KafkConnect

Kafka Connect包括如下特性:

  • Kafka connector通用框架,提供统一的集成API

  • 同时支持分布式模式和单机模式

  • REST 接口,用来查看和管理Kafka connectors

  • 自动化的offset管理,开发人员不必担心错误处理的影响

  • 分布式、可扩展

  • 流/批处理集成

2.当前已有Connector总结

如果开发人员通过producer/consumer API写了一些导入导出的功能,可以考虑尝试换成Kafka Connector,会简化代码,提高应用可扩展和容错的能力。Kafka背后的公司Confluent鼓励社区创建更多的开源的Connector,将Kafka生态圈扩大,促进Kafka Connect的应用。另外在Kafka Connect框架的支持下,我们还可自定义开发Connector,关于如何开发一个定制的Connector,可参考《Connector Developer Guide》。当前已有的Connector包括(涉及到常用开源产品,截至2020年8月18日,从Confluent官网上统计,https://www.confluent.io/hub/):

Connector NamePlugin typeAuthorLicense
Kafka Connect HDFS3SinkConfluent, Inc.Confluent Software Evaluation License
Kafka Connect HDFS 3 SourceSourceConfluent, Inc.Confluent Software Evaluation License
Kafka Connect HDFS 2 SourceSourceConfluent, Inc.Confluent Software Evaluation License
Kafka Connect HDFSSinkConfluent, Inc.Confluent Community License
Kafka Connect JDBCSource/SinkConfluent, Inc.Confluent Community License
Debezium MySQL CDC ConnectorSourceDebezium CommunityApache License 2.0
Debezium PostgreSQL CDC ConnectorSourceDebezium CommunityApache License 2.0
Debezium MongoDB CDC ConnectorSourceDebezium CommunityApache License 2.0
Kafka Connect ElasticsearchSinkConfluent, Inc.Confluent Community License
Kafka Connect Apache HBase Sink ConnectorSinkConfluent, Inc.Confluent Software Evaluation License
Kafka Connect HBase SinkSinkNishu TayalApache License 2.0
MongoDB Connector for Apache KafkaSource/SinkMongoDBApache License 2.0
Kafka Connect InfluxDBSource/SinkConfluent, Inc.Confluent Community License
Kafka Connect RedisSinkJeremy CustenborderApache License 2.0

3.Kafka Connect重要概念

Kafka connect的几个重要的概念包括:

  1. Connectors: 通过管理任务来协调数据流的高级抽象(一个Connector实例是一个逻辑Job,这个逻辑Job就是负责管理Kafka和其他系统之间数据的拷贝)
  2. Tasks: 数据写入kafka和数据从kafka读出的实现
  3. Workers: 运行connectors和tasks的进程(一个Worker里可以运行多个Connector实例,每个Connector实例里会把一个Job分成多个Task)
  4. Converters: kafka connect和其他存储系统直接发送或者接受数据之间转换数据
  5. Transforms**: 用在connect消费或者产生的记录上的简单转换逻辑**
  6. Dead Letter Queue: Connect如何处理connector错误,使用Sink Connector时用到

对于这里的Connector和上个章节中提到的Connector这里需要区分,实现Connector实例或者Connector实例需要使用的类都是定义在Connector Plugin里的,上个章节中实际指的是Connector plugin。Connector实例和Connector Plugin都可以都可以被称为“Connectors”,但可以从上下文中可以区分,例如,“安装Connector”是指Connector Plugin,“检查Connector的状态”指Connector实例。

4.Kafka Connect工作模式

Kafka Connect的工作模式分为两种,分别是standalone模式和distributed模式。

4.1 Standalone Workers

在standalone模式中,所有的工作都在一个独立的Worker进程中完成,比较简单,适合开发环境,但是由于只有一个进程,不具备容错性。

Standalone Worker启动的命令很简单,如下:

USAGE: ./connect-standalone.sh [-daemon] connect-standalone.properties

4.2 Distributed Workers

分布式模式为Kafka Connect提供了可扩展性和自动容错能力。在分布式模式下,可以启动多个Workers,这些Worker采用相同的group.id,通过自动协调(类似Consumer Group的协调均衡机制),在多个Workers之间调度执行connectors和tasks。如果增加Worker,关闭Worker或某个Worker进程意外失败,其他Workers将会检测到这种变化进行rebalance,重新分配connectors以及tasks。下图展示了一个Worker发生故障后,Task如何进行故障转移:

task-failover

Distributed Worker启动命令如下:

USAGE: ./connect-distributed.sh [-daemon] connect-distributed.properties

5. Distributed模式下启动流程分析

根据connect-distributed.sh启动脚本,我们发现入口类是ConnectDistributed:

#Distribute Worker的入口类是ConnectDistributed
exec $(dirname $0)/kafka-run-class.sh $EXTRA_ARGS org.apache.kafka.connect.cli.ConnectDistributed "$@"

以下是启动的核心代码:

/**
 * Connect启动函数
 * @param workerProps Worker的配置文件
 * @return
 */

public Connect startConnect(Map<String, String> workerProps) {
    log.info("Scanning for plugin classes. This might take a moment ...");
    //1.扫描plugin.path路径,加载插件
    Plugins plugins = new Plugins(workerProps);
    plugins.compareAndSwapWithDelegatingLoader();
    DistributedConfig config = new DistributedConfig(workerProps);
    //2.获取Kafka集群ID
    String kafkaClusterId = ConnectUtils.lookupKafkaClusterId(config);
    log.debug("Kafka cluster ID: {}", kafkaClusterId);

    //3.创建一个RestServer,并启动,hostname是rest.advertised.host.name指定,
    // port是rest.advertised.port指定,默认端口是8083,这个rest是供其他worker或者客户端调用
    // workerId就是有host:port指定
    RestServer rest = new RestServer(config);
    rest.initializeServer();

    URI advertisedUrl = rest.advertisedUrl();

    String workerId = advertisedUrl.getHost() + ":" + advertisedUrl.getPort();

    //4.KafkaOffsetBackingStore用于管理offset,需要设置offset.storage.topic,否则会报错
    KafkaOffsetBackingStore offsetBackingStore = new KafkaOffsetBackingStore();
    offsetBackingStore.configure(config);

    //5.client端参数能否被connector里参数覆盖,设置connector.client.config.override.policy参数,默认是NONE,还可以选All,Principal
    ConnectorClientConfigOverridePolicy connectorClientConfigOverridePolicy = plugins.newPlugin(
            config.getString(WorkerConfig.CONNECTOR_CLIENT_POLICY_CLASS_CONFIG),
            config, ConnectorClientConfigOverridePolicy.class);
    //6.worker初始化
    Worker worker = new Worker(workerId, time, plugins, config, offsetBackingStore, connectorClientConfigOverridePolicy);
    WorkerConfigTransformer configTransformer = worker.configTransformer();

    //获取InternalValueConverter(参数internal.value.converter)的值,初始化StatusBackingStore
    Converter internalValueConverter = worker.getInternalValueConverter();
  
    //7.初始化StatusBackingStore,connector和task的状态信息存在status.storage.topic这个Topic里
    StatusBackingStore statusBackingStore = new KafkaStatusBackingStore(time, internalValueConverter);
    statusBackingStore.configure(config);
    //8.初始化ConfigBackingStore,需要获取参数config.storage.topic的值,connector的配置信息存在config.storage.topic这个Topic里
    ConfigBackingStore configBackingStore = new KafkaConfigBackingStore(
            internalValueConverter,
            config,
            configTransformer);

    //9.初始化herder,这个类非常关键,和其他Worker进行协调工作,类似一个Consumer Group,
    // Coordinator会分配任务(connector和task)给它(RoundRobin)
    DistributedHerder herder = new DistributedHerder(config, time, worker,
            kafkaClusterId, statusBackingStore, configBackingStore,
            advertisedUrl.toString(), connectorClientConfigOverridePolicy);
    //10.初始化connect
    final Connect connect = new Connect(herder, rest);
    log.info("Kafka Connect distributed worker initialization took {}ms", time.hiResClockMs() - initStart);
    try {
        //11.connect真正启动的入口
        connect.start();
    } catch (Exception e) {
        log.error("Failed to start Connect", e);
        connect.stop();
        Exit.exit(3);
    }

    return connect;
}

以下是Kafka Connect启动流程的展示,包括和ConnectDistributed关联的类的关系:

Kafka Connect启动流程

接下来,我们将对上述流程中的重点步骤和类进行详细分析:

5.1 步骤1,扫描plugin.path,加载Connector插件

Plugins初始化时,将Worker的配置传给该类的构造函数,构造函数中首先会提取plugin.path的配置,这个配置的值是以逗号分隔的目录列表,指向Connector/Converters/Transforms Plugin的jar包及其依赖等文件目录。涉及到的代码如下:

public Plugins(Map<String, String> props) {
    //提取配置中plugin.path的值
    List<String> pluginLocations = WorkerConfig.pluginLocations(props);
    delegatingLoader = newDelegatingClassLoader(pluginLocations);
    delegatingLoader.initLoaders();
}

然后遍历这个插件路径,加载插件:

protected void initLoaders() {
    for (String configPath : pluginPaths) {
        initPluginLoader(configPath);
    }
    // Finally add parent/system loader.
    initPluginLoader(CLASSPATH_NAME);
    addAllAliases();
}

5.2 步骤3,构造一个Rest Server

RestServer内置了一个Jetty Server,用于接收其他Worker或客户端的请求。关于Rest Server监听的hostname和port相关的配置参数出现了多个,但是在Kafka源代码中提供的配置示例connect-dstiributed.properties中都未列明这些参数,先来看看rest.host.name和rest.port,这两个参数目前在代码里是被 标识被弃用,暂时可用:

/**
 * @deprecated As of 1.1.0.
 */

@Deprecated
public static final String REST_HOST_NAME_CONFIG = "rest.host.name";
private static final String REST_HOST_NAME_DOC
        = "Hostname for the REST API. If this is set, it will only bind to this interface.";

/**
 * @deprecated As of 1.1.0.
 */

@Deprecated
public static final String REST_PORT_CONFIG = "rest.port";
private static final String REST_PORT_DOC
        = "Port for the REST API to listen on.";
public static final int REST_PORT_DEFAULT = 8083

这两个参数是代表REST API监听哪个网络接口,可以用参数listeners代替:

public static final String LISTENERS_CONFIG = "listeners";
private static final String LISTENERS_DOC
        = "List of comma-separated URIs the REST API will listen on. The supported protocols are HTTP and HTTPS.\n" +
        " Specify hostname as 0.0.0.0 to bind to all interfaces.\n" +
        " Leave hostname empty to bind to default interface.\n" +
        " Examples of legal listener lists: HTTP://myhost:8083,HTTPS://myhost:8084";

如果使用者按照样例文件配置成如下:

#rest.host.name=
#rest.port=8083

那么会将listeners设置为HTTP://:8083,相应的代码在如下位置:

@SuppressWarnings("deprecation")
List<String> parseListeners() {
    //由于listeners未设置,那么这里返回null
    List<String> listeners = config.getList(WorkerConfig.LISTENERS_CONFIG);
    if (listeners == null || listeners.size() == 0) {
        //由于rest.host.name未设置,这里返回null
        String hostname = config.getString(WorkerConfig.REST_HOST_NAME_CONFIG);

        if (hostname == null)
            hostname = "";
        //这里listeners=[HTTP://:8083],默认的rest.port是8083
        listeners = Collections.singletonList(String.format("%s://%s:%d", PROTOCOL_HTTP, hostname, config.getInt(WorkerConfig.REST_PORT_CONFIG)));
    }

    return listeners;
}

除了上述参数,还有rest.advertised.host.name、rest.advertised.port、rest.advertised.listener这些参数,

这些参数是用于Worker之间通信的hostname和端口。

5.3 步骤4、7、8,三个BackingStore

为了保存Connector和Task的信息,启动Connect之前需要先建立3个Topic,分别由参数offset.storage.topic、status.storage.topic和config.storage.topic指定,这些Topic设置时需要注意,清理策略需要设置为compact,否则如果设置为delete可能会因为大小或时间的原因,Topic里的数据被清理掉。另外为了保证读取config信息是有序的,config.storage.topic指定的Topic必须是单分区。

上述3个Topic的读取和写入分别由KafkaOffsetBackingStore、KafkaStatusBackingStore、KafkaConfigBackingStore管理,这三个类中均有一个KafkaBasedLog的实例,在每个实例中,包含一个Consumer实例(读取offset/status/config信息),一个Producer实例(往Topic中写入offset/status/config信息),以及一个线程WorkThread,该线程一直在消费对应Topic中的消息。

这里需要注意一个offset提交方式,在Worker启动时,会构造一个SourceTaskOffsetCommiter类,一个Worker会统一对其管辖范围内SourceTask的offset提交,这个功能就是由SourceTaskOffsetCommiter类提供。

/**
 * Start worker.
 */

public void start() {
    log.info("Worker starting");

    offsetBackingStore.start();
    //构建一个SourceTaskOffsetCommiter,只针对SourceTask
    sourceTaskOffsetCommitter = new SourceTaskOffsetCommitter(config);

    connectorStatusMetricsGroup = new ConnectorStatusMetricsGroup(metrics, tasks, herder);

    log.info("Worker started");
}

offset提交是定时的,由参数offset.flush.interval.ms指定,默认值是60000L,即Task启动后,每1分钟提交一次offset。

public void schedule(final ConnectorTaskId id, final WorkerSourceTask workerTask) {
    long commitIntervalMs = config.getLong(WorkerConfig.OFFSET_COMMIT_INTERVAL_MS_CONFIG);
    ScheduledFuture<?> commitFuture = commitExecutorService.scheduleWithFixedDelay(new Runnable() {
        @Override
        public void run() {
            try (LoggingContext loggingContext = LoggingContext.forOffsets(id)) {
                commit(workerTask);
            }
        }
    }, commitIntervalMs, commitIntervalMs, TimeUnit.MILLISECONDS);
    committers.put(id, commitFuture);
}

6.参考文档:

http://kafka.apache.org/documentation.html#connect

https://www.confluent.io/blog/announcing-kafka-connect-building-large-scale-low-latency-data-pipelines/

https://docs.confluent.io/current/connect/

作者介绍:

文乔:2012年硕士毕业后加入民生银行生产运营部系统管理中心,天眼日志平台主要参与人,目前在开源软件支持组负责Flume、Kafka的源码研究和工具开发等相关工作。微信tinawenqiao,邮箱wenqiao@cmbc.com.cn。

王健 :2011年加入民生银行科技部,数据库管理员(负责DB2,Oracle,MySQL等运维工作,对MPP等数据库有很长的维护和实施经验,擅长数据迁移等等),同时负责行内KAFKA集群运维和实施工作,负责行内数据库实时复制等工作。

李腾:2019年加入民生银行信息科技部,DBA,主要负责行内KAFKA相关运维工作。



「喜欢文章,快来给作者赞赏墨值吧」
文章转载自民生运维人,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论