暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

NiFI实时同步MySQL数据到Kafka保姆级教程!

大数据技能圈 2023-06-29
395

Apache NiFi是一个开源的、易于使用和可扩展的数据集成工具。它提供了一种可视化的方式来设计、管理和执行数据流。NiFi的设计目标是处理和分发大量数据的实时流。它提供了强大的数据流转和转换功能,可以将数据从各种源头(如数据库、文件系统、消息队列等)收集、转换和传输到各种目标(如数据库、文件系统、消息队列等)。

我们有一个MySQL数据库存储着大量的数据。现在,我们希望能够将这些数据实时地同步到Kafka消息队列,以便更好地支持实时数据处理和分析的需求。
为了实现这个目标,我们使用Apache NiFi作为数据同步工具,NIFI的特点如下:
实时性:使用NiFi来同步MySQL到Kafka可以实现数据的实时传输。相比于传统的批处理方式,实时数据流能够保证数据更加及时地从MySQL发送到Kafka,使得数据能够立即被消费和处理,提升了数据处理的实时性。
解耦性:将数据从MySQL同步到Kafka可以实现数据的解耦。通过使用NiFi的强大功能,我们可以灵活地定义数据的流向和目标,消除了数据生产者和消费者之间的耦合,使得数据处理和分析的过程更加灵活和可扩展。
分发和复制:使用NiFi同步MySQL到Kafka,我们可以轻松实现数据的分发和复制。通过配置NiFi的流程,我们可以将来自MySQL的数据同时发送到多个Kafka主题中,以支持多个消费者和不同的数据处理流程,提供更大的灵活性和数据处理的能力。
可靠性和容错性:NiFi提供了强大的数据处理和流量控制功能,能够确保MySQL到Kafka的同步过程具有高可靠性和容错性。即使在同步过程中出现错误或者故障,NiFi能够自动进行故障恢复,并保证数据的完整性和一致性,极大地提升了数据同步的可靠性和稳定性。
一、安装
1. Windows系统下解压nifi安装包,同时下载mysql-connect-java-x.x.x.x.jar

2. 双击run-nifi.bat启动

3. 需要注意的是,JDK版本要是11及以上。

二、配置任务

1.配置GenerateTableFetch,用来增量抽取数据

需要链接数据源连接池,同时指定表、字段及用来规定增量抽取开始的位置,一般是时间戳或者主键

点击设置数据连接池

点击设置enable

enable

2. 设置ExecteSQLRecord,用来将数据转成json

选择数据源,及Record Writer,我们选择JsonRecordSetWriter

设置JsonRecordSetWriter

3.设置PublishKafka,用来向Kafka发送数据

设置Broker、topic

使用kafka consumer可以查看发送过来的消息

4.设置ConsumerKafka,用来消费Kafka消息

设置broker,topic

5.设置SplitJson,用来将数组中的JSON串切分出来

如果是根目录的话,穿参数$就可以,有其他KEY,可以用$.KEY

6.设置ConvertJSONToSQL,用来将JSON转成SQL

配置目标库、表,及写入方式

7.配置PutSQL,用来执行SQL

配置数据源即可

8.查看效果

9.配置全量抽取

复制一个ExcuteSQLRecord 配置SQL select Query即可

执行单次运行

10.查看效果

11. 测试表建表语句

    -- test.fact_table definition


    CREATE TABLE `fact_table` (
    `id` int NOT NULL AUTO_INCREMENT,
    `field1` varchar(255) DEFAULT NULL,
    `field2` int DEFAULT NULL,
    `field3` decimal(10,2) DEFAULT NULL,
    `field4` date DEFAULT NULL,
    `field5` time DEFAULT NULL,
    `field6` tinyint(1) DEFAULT NULL,
    `field7` enum('A','B','C') DEFAULT NULL,
    `field8` text,
    `field9` blob,
    PRIMARY KEY (`id`)
    ) ENGINE=InnoDB AUTO_INCREMENT=20 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;
      -- test.fact_table2 definition


      CREATE TABLE `fact_table2` (
      `id` int NOT NULL AUTO_INCREMENT,
      `field1` varchar(255) DEFAULT NULL,
      `field2` int DEFAULT NULL,
      `field3` decimal(10,2) DEFAULT NULL,
      `field4` date DEFAULT NULL,
      `field5` time DEFAULT NULL,
      `field6` tinyint(1) DEFAULT NULL,
      `field7` enum('A','B','C') DEFAULT NULL,
      `field8` text,
      `field9` blob,
      PRIMARY KEY (`id`)
      ) ENGINE=InnoDB AUTO_INCREMENT=20 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;

      配置配置如图

      定时任务都配置成任务之间0S间隔,就能达到实时的效果

      查看每个步骤之间的输出效果,可以停止下游执行器,让数据积累在关系中

      右键关系查看List Queue

      点击小眼睛查看数据内容

      更多大数据相关内容请关注大数据技能圈公众号:

      文章转载自大数据技能圈,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

      评论