
官网
Retrieves Change Data Capture (CDC) events from a MySQL database. CDC Events include INSERT, UPDATE, DELETE operations. Events are output as individual flow files ordered by the time at which the operation occurred.
个人理解
从MySQL数据库中检索更改数据捕获(CDC)事件。CDC事件包括插入、更新、删除操作。事件输出为按操作发生时的顺序排列的单个流文件。
属性详情
| 属性 | 描述 |
| MySQL Hosts | 必填,支持表达式语言 填写mysql的集群列表,多个用,隔开;格式host1:port,host2:port |
| MySQL Driver Class Name | 必填,支持表达式语言 java驱动类 5.7版本 com.mysql.jdbc.Driver 8.0版本及以上com.mysql.cj.jdbc.Driver |
| MySQL Driver Location(s) | 非必填,支持表达式语言 驱动包路径 |
| Username | 非必填,支持表达式语言 用户名 |
| Password | 非必填,支持表达式语言 密码 |
| Server ID | 非必填,支持表达式语言 该处理器能监听到mysql的binlog的事件,本质是伪装成了一个副本服务器。 既然是副本服务器则需要一个Server ID,默认65535 |
| Database/Schema Name Pattern | 一个正则表达式(regex),用于根据CDC事件列表匹配数据库(或模式,取决于RDBMS的术语)。regex必须与存储在RDBMS中的数据库名称匹配。如果未设置该属性,则不会使用数据库名称筛选CDC事件。注意:DDL事件,即使它们影响不同的数据库,也与会话执行DDL所使用的数据库相关联。这意味着,如果连接到一个数据库,但是对另一个数据库发出DDL,那么连接的数据库将是与指定模式匹配的数据库。 |
| Table Name Pattern | 表名称的正则表达式. 不填默认就是全库同步; |
| Max Wait Time | 建立连接所允许的最长时间,零表示实际上没有限制。 |
| Distributed Map Cache Client | 分布式缓存服务 我一般用 DistributedMapCacheClientService |
| Retrieve All Records | 指定是否获取所有可用的CDC事件,而与当前binlog文件名和/或位置无关。如果binlog文件名和位置值存在于处理器的状态中,则将忽略此属性的值。这允许进行4种不同的配置:1)如果Binlog数据在处理器状态下可用,则该数据用于确定开始位置,并且“检索所有记录”的值将被忽略。2)如果没有二进制日志数据处于处理器状态,则将“检索所有记录”设置为true表示从二进制日志历史记录的开头开始。3)如果没有Binlog数据处于处理器状态,并且未设置Initial Binlog文件名/位置,则Retrieve All Records设置为false意味着从Binlog历史记录的末尾开始。4)如果没有Binlog数据处于处理器状态,并且已设置Initial Binlog文件名/位置,然后将“检索所有记录”设置为false意味着从指定的初始Binlog文件/位置开始。要重置行为,请清除处理器状态(请参阅处理器文档的“状态管理”部分)。 |
| Include Begin/Commit Events | 指定是否在二进制日志中发出与BEGIN或COMMIT事件相对应的事件。如果在下游流中需要BEGIN COMMIT事件,则将其设置为true;否则,将其设置为false,这将抑制这些事件的产生并提高流性能。 |
| Include DDL Events | 指定是否在二进制日志中发出与数据定义语言(DDL)事件相对应的事件,例如ALTER TABLE,TRUNCATE TABLE。如果在下游流中需要DDL事件是必需的,则将其设置为true;否则,将其设置为false,这将抑制这些事件的生成并提高流性能。 |
| State Update Interval | 指示使用二进制日志文件/位置值更新处理器状态的频率。零值表示仅在处理器停止或关闭时才更新状态。如果在某个时候处理器状态不包含所需的binlog值,则发出的最后一个流文件将包含最后观察到的值,并且可以使用Initial Binlog File,Initial Binlog Position和Initial Sequence将处理器返回到该状态。ID属性。 |
| Initial Sequence ID | 指定在此处理器的状态不具有当前序列标识符的情况下要使用的初始序列标识符。如果处理器的状态中存在序列标识符,则将忽略此属性。序列标识符是单调递增的整数,记录由处理器生成的流文件的顺序。它们可以与EnforceOrder处理器一起使用,以保证CDC事件的有序交付。 |
| Initial Binlog Filename | 指定一个初始二进制日志文件名,如果该处理器的“状态”没有当前二进制日志文件名,则使用该文件名。如果处理器的状态中存在文件名,则将忽略此属性。如果不需要先前的事件,可以将其与初始Binlog位置一起使用以“向前跳过”。请注意,支持NiFi表达式语言,但是在配置处理器时会评估此属性,因此可能不会使用FlowFile属性。支持使用表达式语言来启用变量注册表和/或环境属性。 |
| Initial Binlog Position | 如果该处理器的State没有当前的binlog文件名,则指定要使用的binlog的初始偏移量(由Initial Binlog Filename指定)。如果处理器的状态中存在文件名,则将忽略此属性。如果不需要先前的事件,可以将其与初始Binlog文件名一起使用以“向前跳过”。请注意,支持NiFi表达式语言,但是在配置处理器时会评估此属性,因此可能不会使用FlowFile属性。支持使用表达式语言来启用变量注册表和/或环境属性。 |

其中Table Name Pattern不填默认全库cdc监听
总结
实战演练一把
首先我安装了一个mysql8.0.211,并且创建了一个数据库test_a/test_b和两张表person、person1
CREATE TABLE `person` (`id` int NOT NULL AUTO_INCREMENT,`name` varchar(45) DEFAULT NULL,`age` int DEFAULT NULL,PRIMARY KEY (`id`)) ENGINE=InnoDB DEFAULT CHARSET=utf8CREATE TABLE `person1` (`id` int NOT NULL AUTO_INCREMENT,`name` varchar(45) DEFAULT NULL,`age` int DEFAULT NULL,PRIMARY KEY (`id`)) ENGINE=InnoDB DEFAULT CHARSET=utf8
几个重要点讲一下
第一:binglog要开,mysql8自动开的,但是5.7要自己去改配置并且重启
8.0版本这个处理器可能没办法连接,报错client does not support authentic,我这样解决的
ALTER USER 'root'@'localhost' IDENTIFIED BY 'Huorong123' PASSWORD EXPIRE NEVER;ALTER USER 'root'@'localhost' IDENTIFIED WITH mysql_native_password BY 'Huorong123';FLUSH PRIVILEGES;
出现这种问题的原因是:mysql8以前加密规则是mysql_native_password,而在mysql8之后加密规则是caching_sha2_password
第二:CaptureChangeMySQL依赖分布式缓存服务,模板中我用了DistributedMapCacheClientService客户端、DistributedMapCacheServer服务端走的全是默认默认端口也用的是4557。一定要这两个服务,否则解析的binlog数据没有字段,就毫无意义。下面是正常解析的一条数据

有分布式缓存服务解析结果

不带分布式缓存解析结果
第三:对于数据的解析我用的是groovy脚本
import com.fasterxml.jackson.databind.JsonNodeimport com.fasterxml.jackson.databind.ObjectMapperimport org.apache.commons.io.IOUtilsimport org.apache.nifi.flowfile.FlowFileimport org.apache.nifi.logging.ComponentLogimport org.apache.nifi.processor.ProcessContextimport org.apache.nifi.processor.ProcessSessionimport org.apache.nifi.processor.Relationshipimport org.apache.nifi.processor.io.InputStreamCallbackimport org.apache.nifi.processor.io.OutputStreamCallbackimport java.nio.charset.StandardCharsetsimport java.sql.Timeimport java.sql.TimestampProcessSession current_session = sessionProcessContext current_context = contextComponentLog current_log = logRelationship SUCCESS = REL_SUCCESSRelationship FAILURE = REL_FAILUREFlowFile flowFile = current_session.get()try {String text = '{}'current_session.read(flowFile, { inputStream -> text = IOUtils.toString(inputStream, StandardCharsets.UTF_8) } as InputStreamCallback)ObjectMapper mapper = new ObjectMapper()JsonNode data = mapper.readTree(text)if (data.isEmpty()) {current_session.transfer(flowFile, FAILURE)}JsonNode columns = data.path('columns')Iterator<JsonNode> iterable = columns.iterator()Map<String, Object> map = new HashMap<>()while (iterable.hasNext()) {JsonNode column = iterable.next()int type = column.get("column_type").asInt()Object value = convert(column, type, "value",current_log)if (value!=null){map.put(column.get("name").asText(), value)}}flowFile = current_session.write(flowFile, { outputStream ->outputStream.write(mapper.writeValueAsString(map).getBytes(StandardCharsets.UTF_8))} as OutputStreamCallback)String tableName = data.get('table_name').asText()current_session.putAttribute(flowFile, 'table_name', tableName)current_session.transfer(flowFile, SUCCESS)} catch (e) {current_log.error(e.getMessage())current_session.transfer(flowFile, FAILURE)current_context.yield()}static Object convert(JsonNode column, int type, String label, ComponentLog log) {Object value = nullJsonNode valueLabel = column.get(label)if (valueLabel.isNull()) {return value}try {switch (type) {case 12:case 1:value = valueLabel.asText()breakcase -4:value = valueLabel.asText().getBytes()breakcase 4:value = valueLabel.asLong()breakcase -6:case 5:value = valueLabel.asInt()breakcase -7:value = valueLabel.asBoolean()breakcase -5:value = new BigInteger(valueLabel.asText())breakcase 7:value = new Float(valueLabel.asText())breakcase 8:value = valueLabel.asDouble()breakcase 3:value = new BigDecimal(valueLabel.asText())breakcase 91:value = new java.util.Date(valueLabel.asText()).getTime()breakcase 92:value = new Time(new java.util.Date(valueLabel.asText()).getTime())breakcase 93:value = new Timestamp(new java.util.Date(valueLabel.asText()).getTime())breakdefault:value = valueLabel.asText()break}} catch (e) {log.error(e.getMessage())}return value}
其中这个convert函数参照这个jdbc、java、数据库数据三者对照表格;
其中91、92、93数据结构处理不一定正确,因为我也不知道mysql时间字段能不能这么直接先转换成时间long类型,再去转成实际数据类型;
groovy脚本我依赖了jackson的三个jar包,需要配置这三个jar包的路径

ExcuteScript
模板和jar包都放在模板分享链接里边了。这套模板,insert、update、delete都是支持的;


CaptureChangeMySQL

CovertJSONToSQL
模板分享
链接:https://pan.baidu.com/s/1ALS03Prcyzs23S2-_FgyOQ
提取码:aqb6




