Debezium 的 MongoDB 连接器跟踪 MongoDB 副本集或 MongoDB 分片集群,以获取数据库和集合中的文档更改,并将这些更改记录为 Kafka 主题中的事件。 连接器会自动处理分片集群中分片的添加或删除、每个副本集成员身份的更改、每个副本集中的选举以及等待通信问题解决。
有关与此连接器兼容的 MongoDB 版本的信息,请参阅 Debezium 版本概述。
概述
MongoDB的复制机制提供了冗余和高可用性,是在生产环境中运行MongoDB的首选方式。 MongoDB 连接器捕获副本集或分片集群中的更改。
MongoDB 副本集由一组服务器组成,这些服务器都具有相同数据的副本,复制可确保客户端对副本集主服务器上的文档所做的所有更改都正确应用于其他副本集的服务器,称为辅助副本。 MongoDB复制的工作原理是让主节点在其oplog(或操作日志)中记录更改,然后每个辅助节点读取主节点的oplog,并按顺序将所有操作应用于自己的文档。 将新服务器添加到副本集时,该服务器首先对主服务器上的所有数据库和集合执行快照,然后读取主服务器的 oplog 以应用自启动快照以来可能所做的所有更改。 这个新服务器在赶上主服务器oplog的尾部时成为辅助服务器(并且能够处理查询)。
更改流
虽然Debezium MongoDB连接器不会成为副本集的一部分,但它使用类似的复制机制来获取oplog数据。 主要区别在于连接器不直接读取操作日志。 相反,它将oplog数据的捕获和解码委托给MongoDB更改流功能。 使用更改流,MongoDB服务器将集合中发生的更改公开为事件流。 Debezium 连接器监视流,然后将更改传递到下游。 连接器首次检测到副本集时,它会检查 oplog 以获取上次记录的事务,然后执行主数据库和集合的快照。 连接器完成复制数据后,它会从之前读取的 oplog 位置开始创建一个更改流。
当MongoDB连接器进程发生变化时,它会定期记录事件在oplog流中发起的位置。 当连接器停止时,它会记录它处理的最后一个 oplog 流位置,以便在重新启动后可以从该位置恢复流。 换句话说,连接器可以停止、升级或维护,并在一段时间后重新启动,并且始终准确地从中断的位置继续,而不会丢失单个事件。 当然,MongoDBoplog通常被限制在最大大小,因此如果连接器长时间停止,则oplog中的操作可能会在连接器有机会读取它们之前被清除。 在这种情况下,重新启动后,连接器会检测缺少的oplog操作,执行快照,然后继续流式传输更改。
MongoDB连接器还非常容忍副本集的成员资格和领导权的变化,分片集群内分片的添加或删除以及可能导致通信故障的网络问题。 连接器始终使用副本集的主节点流式传输更改,因此当副本集经过选举并且其他节点成为主节点时,连接器将立即停止流式传输更改,连接到新的主节点,并使用新的主节点开始流式传输更改。 同样,如果连接器无法与副本集主节点通信,它将尝试重新连接(使用指数退避,以免使网络或副本集不堪重负)。 重新建立连接后,连接器将继续流式传输它捕获的最后一个事件的更改。 通过这种方式,连接器会根据副本集成员身份的变化动态调整,并自动处理通信中断。
其他资源
MongoDB连接器的工作原理
连接器支持的 MongoDB 拓扑概述对于规划应用程序非常有用。
配置和部署 MongoDB 连接器时,它首先连接到种子地址的 MongoDB 服务器,并确定有关每个可用副本集的详细信息。 由于每个副本集都有自己独立的 oplog,因此连接器将尝试为每个副本集使用单独的任务。 连接器可以限制它将使用的最大任务数,如果没有足够的任务可用,连接器将为每个任务分配多个副本集,尽管该任务仍将为每个副本集使用单独的线程。
对分片集群运行连接器时,请使用大于副本集数的值。 这将允许连接器为每个副本集创建一个任务,并允许 Kafka Connect 跨所有可用工作进程协调、分发和管理任务。 |
支持的 MongoDB 拓扑
MongoDB连接器支持以下MongoDB拓扑:
- MongoDB 副本集
Debezium MongoDB连接器可以从单个MongoDB副本集中捕获更改。 生产副本集至少需要三个成员。
若要将 MongoDB 连接器与副本集一起使用,必须将连接器配置中的属性值设置为副本集连接字符串。 当连接器准备好开始从 MongoDB 更改流捕获更改时,它会启动连接任务。 然后,连接任务使用指定的连接字符串建立与可用副本集成员的连接。
mongodb.connection.string
由于连接器管理数据库连接的方式发生了变化,此版本的 Debezium 不再支持使用该属性来阻止连接器执行成员资格发现。 |
- MongoDB分片集群
MongoDB分片集群包括:
一个或多个分片,每个分片部署为副本集;
充当群集配置服务器的单独副本集
一个或多个路由器(也称为),客户端连接到这些路由器并将请求路由到相应的分片
mongos若要将 MongoDB 连接器与分片群集一起使用,请在连接器配置中将属性的值设置为分片群集连接字符串。
mongodb.connection.string
该属性将用于为连接器的早期版本提供配置服务器副本的主机地址的已弃用属性。 在当前版本中,用于为连接器提供 MongoDB 路由器的地址,也称为 . |
当连接器连接到分片集群时,它会发现有关表示集群中分片的每个副本集的信息。 连接器使用单独的任务来捕获每个分片的更改。 在集群中添加或删除分片时,连接器会动态调整任务数以补偿更改。 |
- MongoDB独立服务器
MongoDB连接器无法监控独立MongoDB服务器的更改,因为独立服务器没有oplog。 如果将独立服务器转换为具有一个成员的副本集,则连接器将正常工作。
MongoDB不建议在生产环境中运行独立的服务器。 有关更多信息,请参阅 MongoDB 文档。 |
逻辑连接器名称
连接器配置属性用作 MongoDB 副本集或分片集群的逻辑名称。 连接器以多种方式使用逻辑名称:作为所有主题名称的前缀,以及在记录每个副本集的更改流位置时作为唯一标识符。topic.prefix
您应该为每个MongoDB连接器提供一个唯一的逻辑名称,以有意义地描述源MongoDB系统。 我们建议逻辑名称以字母或下划线字符开头,其余字符为字母数字或下划线。
执行快照
当任务使用副本集启动时,它使用连接器的逻辑名称和副本集名称来查找描述连接器以前停止读取更改的位置的偏移量。 如果可以找到偏移量并且它仍然存在于oplog中,则任务会立即从记录的偏移位置开始进行流式更改。
但是,如果未找到偏移量或 oplog 不再包含该位置,则任务必须首先通过执行快照来获取副本集内容的当前状态。 此过程首先记录oplog的当前位置并将其记录为偏移量(以及表示快照已启动的标志)。 然后,该任务将继续复制每个集合,生成尽可能多的线程(最多为配置属性的值)以并行执行此工作。 连接器将为它看到的每个文档记录一个单独的读取事件,该读取事件将包含对象的标识符、对象的完整状态以及有关找到对象的 MongoDB 副本集的源信息。 源信息还将包括一个标志,该标志表示事件是在快照期间生成的。snapshot.max.threads
此快照将继续,直到它复制了与连接器筛选器匹配的所有集合。 如果在任务快照完成之前停止连接器,则在重新启动时,连接器将再次启动快照。
尽量避免在连接器执行任何副本集的快照时重新分配和重新配置任务。 连接器生成日志消息以报告快照的进度。 若要提供最大的控制,请为每个连接器运行单独的 Kafka Connect 群集。 |
临时快照
默认情况下,连接器仅在首次启动后运行初始快照操作。 在此初始快照之后,在正常情况下,连接器不会重复快照过程。 连接器捕获的任何未来更改事件数据仅通过流式处理过程传入。
但是,在某些情况下,连接器在初始快照期间获取的数据可能会过时、丢失或不完整。 为了提供重新捕获集合数据的机制,Debezium 包含一个执行临时快照的选项。 数据库中的以下更改可能是执行临时快照的原因:
修改连接器配置以捕获一组不同的集合。
Kafka 主题将被删除,必须重新生成。
由于配置错误或其他问题,会发生数据损坏。
您可以通过启动所谓的临时快照,为之前为其捕获快照的集合重新运行快照。 临时快照需要使用信令收集。 您可以通过向 Debezium 信令集合发送信号请求来启动临时快照。
启动现有集合的即席快照时,连接器会将内容追加到集合已存在的主题。 如果删除了以前存在的主题,如果启用了自动主题创建,Debezium 可以自动创建主题。
即席快照信号指定要包含在快照中的集合。 快照可以捕获数据库的全部内容,也可以仅捕获数据库中集合的子集。
您可以通过向信令集合发送消息来指定要捕获的集合。 将信号类型设置为 ,并提供要包含在快照中的集合的名称,如下表所述:execute-snapshotexecute-snapshotincremental
| 田 | 违约 | 价值 |
|---|---|---|
|
| 指定要运行的快照的类型。 |
| 不适用 | 一个数组,其中包含与要快照的集合的完全限定名称匹配的正则表达式。 |
触发临时快照
您可以通过将具有信号类型的条目添加到信令集合来启动即席快照。 连接器处理邮件后,将开始快照操作。 快照过程读取第一个和最后一个主键值,并将这些值用作每个集合的起点和终点。 根据集合中的条目数和配置的块大小,Debezium 将集合划分为块,并继续连续地为每个块创建快照,一次一个。execute-snapshot
目前,操作类型仅触发增量快照。 更多信息,请参见增量快照。execute-snapshot
增量快照
为了提供管理快照的灵活性,Debezium 包含一个补充快照机制,称为增量快照。 增量快照依赖于 Debezium 机制将信号发送到 Debezium 连接器。 增量快照基于 DDD-3 设计文档。
在增量快照中,Debezium 不是像初始快照那样一次捕获数据库的完整状态,而是分阶段捕获一系列可配置块中的每个集合。 您可以指定希望快照捕获的集合以及每个区块的大小。 区块大小决定了快照在数据库上的每次读取操作期间收集的行数。 增量快照的默认区块大小为 1024 行。
随着增量快照的进行,Debezium 使用水印来跟踪其进度,并维护其捕获的每个收集行的记录。 与标准的初始快照过程相比,这种分阶段捕获数据的方法具有以下优势:
您可以在流式数据捕获的同时并行运行增量快照,而不是将流式传输到快照完成。 在整个快照过程中,连接器继续从更改日志中捕获近乎实时的事件,并且两个操作都不会阻止另一个操作。
如果增量快照的进度中断,您可以恢复它而不会丢失任何数据。 该过程恢复后,快照将从停止的位置开始,而不是从头开始重新捕获集合。
您可以随时按需运行增量快照,并根据需要重复该过程以适应数据库更新。 例如,在修改连接器配置以将集合添加到其
collection.include.list属性后,可以重新运行快照。
增量快照过程
运行增量快照时,Debezium 会按主键对每个集合进行排序,然后根据配置的块大小将集合拆分为块。 它逐块工作,然后捕获块中的每个集合行。 对于它捕获的每一行,快照都会发出一个事件。 该事件表示区块快照开始时行的值。READ
随着快照的进行,其他进程可能会继续访问数据库,从而可能修改集合记录。 为了反映此类更改,将按常规将 、 或 操作提交到事务日志中。 同样,正在进行的 Debezium 流进程会继续检测这些变更事件,并向 Kafka 发送相应的变更事件记录。INSERTUPDATEDELETE
Debezium 如何解决具有相同主键的记录之间的冲突
在某些情况式处理发出的 or 事件是按顺序接收的。 也就是说,流式处理过程可能会发出一个事件,该事件在快照捕获包含该行的事件的区块之前修改收集行。 当快照最终发出行的相应事件时,其值已被取代。 为了确保以正确的逻辑顺序处理不按顺序到达的增量快照事件,Debezium 采用缓冲方案来解决冲突。 只有在解决了快照事件和流事件之间的冲突后,Debezium 才会向 Kafka 发出事件记录。UPDATEDELETEREADREAD
快照窗口
为了帮助解决延迟到达事件和修改同一集合行的流事件之间的冲突,Debezium 采用了所谓的快照窗口。 快照窗口划分增量快照捕获指定集合区块的数据的时间间隔。 在块的快照窗口打开之前,Debezium 遵循其常规行为,并从事务日志中直接向目标 Kafka 主题下游发出事件。 但是,从特定块的快照打开的那一刻起,直到它关闭,Debezium 都会执行重复数据消除步骤来解决具有相同主键的事件之间的冲突。READ
对于每个数据收集,Debezium 都会发出两种类型的事件,并将这两种事件的记录存储在单个目标 Kafka 主题中。 它直接从表中捕获的快照记录作为操作发出。 同时,随着用户不断更新数据收集中的记录,并且事务日志会更新以反映每次提交,Debezium 会针对每次更改发出或操作。READUPDATEDELETE
当快照窗口打开并且 Debezium 开始处理快照块时,它会将快照记录传送到内存缓冲区。 在快照窗口期间,缓冲区中事件的主键将与传入流事件的主键进行比较。 如果未找到匹配项,则流事件记录将直接发送到 Kafka。 如果 Debezium 检测到匹配项,它将丢弃缓冲事件,并将流记录写入目标主题,因为流事件在逻辑上取代静态快照事件。 区块的快照窗口关闭后,缓冲区仅包含不存在相关事务日志事件的事件。 Debezium 将这些剩余事件发送到集合的 Kafka 主题。READREADREADREAD
连接器对每个快照区块重复该过程。
增量快照需要主键稳定排序。但是,可能无法保证编码和特殊字符的稳定排序 可能导致意外行为(Mongo 排序 |
要将增量快照与分片 MongoDB 集群一起使用,您必须为以下属性设置特定值:
|
触发增量快照
目前,启动增量快照的唯一方法是将临时快照信号发送到源数据库上的信令集合。
通过使用 MongoDB 方法将信号提交到信令集合。insert()
Debezium 检测到信令集合中的更改后,它会读取信号,并运行请求的快照操作。
您提交的查询指定要包含在快照中的集合,并选择性地指定快照操作的类型。 目前,快照操作的唯一有效选项是默认值 。incremental
要指定要包含在快照中的集合,请提供一个列出集合的数组或用于匹配集合的正则表达式数组,例如,data-collections{"data-collections": ["public.Collection1", "public.Collection2"]}
增量快照信号的数组没有默认值。 如果阵列为空,Debezium 会检测到不需要执行任何操作,并且不会执行快照。data-collectionsdata-collections
如果要包含在快照中的集合的名称在数据库、架构或表的名称中包含点 (),若要将集合添加到数组中,则必须用双引号转义名称的每个部分。 |
先决条件
源数据库上存在信令数据收集。
信令数据收集在
signal.data.collection属性中指定。
使用源信令通道触发增量快照
将快照信号文档插入信令集合:
<signalDataCollection>.insert({"id" : _<idNumber>,"type" : <snapshotType>, "data" : {"data-collections" ["<collectionName>", "<collectionName>"],"type": <snapshotType>}});例如
db.debeziumSignal.insert({ "type" : "execute-snapshot", "data" : { "data-collections" ["\"public\".\"Collection1\"", "\"public\".\"Collection2\""], "type": "incremental"} });命令中 、 和参数的值对应于信令集合的字段。
idtypedata示例中的参数说明如下表所示:
表 2.MongoDB insert() 命令中用于向信令集合发送增量快照信号的字段描述 项目 价值 描述 1
db.debeziumSignal指定源数据库上信令集合的完全限定名称。
2
零
该参数指定分配为信号请求标识符的任意字符串。
前面示例中的 insert 方法省略了可选参数的使用。 由于文档没有显式为参数分配值,因此MongoDB自动分配给文档的任意id将成为信号请求的标识符。
使用此字符串标识将消息记录到信令集合中的条目。 Debezium 不使用此标识符字符串。 相反,在快照期间,Debezium 会生成自己的字符串作为水印信号。_idid_ididid3
execute-snapshot指定参数 指定信号要触发的操作。
type4
data-collections信号字段的必需组件,用于指定集合名称或正则表达式的数组,以匹配要包含在快照中的集合名称。
该数组列出了按集合的完全限定名称匹配集合的正则表达式,其格式与在signal.data.collection配置属性中指定连接器的信令集合的名称的格式相同。data5
incremental信号字段的可选组件,用于指定要运行的快照操作的类型。
目前,唯一有效的选项是默认值 .
如果未指定值,连接器将运行增量快照。typedataincremental
以下示例显示了连接器捕获的增量快照事件的 JSON。
示例:增量快照事件消息
{
"before":null,
"after": {
"pk":"1",
"value":"New data"
},
"source": {
...
"snapshot":"incremental"
},
"op":"r",
"ts_ms":"1620393591654",
"transaction":null
}| 项目 | 字段名称 | 描述 |
|---|---|---|
1 |
| 指定要运行的快照操作的类型。 |
2 |
| 指定事件类型。 |
使用卡夫卡信令通道的过程
您可以向已配置的 Kafka 主题发送消息,以请求连接器运行临时增量快照。
Kafka 消息的键必须与连接器配置选项的值匹配。topic.prefix
消息的值是带有 和 字段的 JSON 对象。typedata
信号类型为 ,并且该字段必须具有以下字段:execute-snapshotdata
| 田 | 违约 | 价值 |
|---|---|---|
|
| 要执行的快照的类型。 目前Debezium仅支持该类型。 |
| 不适用 | 逗号分隔的正则表达式数组,与要包含在快照中的表的完全限定名称匹配。 |
| 不适用 | 一个可选字符串,它指定连接器评估的条件,以指定要包含在快照中的列子集。 |
执行快照 Kafka 消息的示例:
Key = `test_connector`
Value = `{"type":"execute-snapshot","data": {"data-collections": ["schema1.table1", "schema1.table2"], "type": "INCREMENTAL"}}`Debezium 使用该字段来选择集合内容的子集。additional-condition
通常,当 Debezium 运行快照时,它会运行 SQL 查询,例如:
SELECT * FROM <tableName> ….
当快照请求包含 时,会将 追加到 SQL 查询中,例如:additional-conditionadditional-condition
SELECT * FROM <tableName> WHERE <additional-condition> ….
例如,给定一个包含列(主键)和 的集合,如果希望快照仅包含以下内容,则在请求快照时,可以附加一条语句来筛选内容:productsidcolorbrandcolor='blue'additional-condition
Key = `test_connector`
Value = `{"type":"execute-snapshot","data": {"data-collections": ["schema1.products"], "type": "INCREMENTAL", "additional-condition":"color='blue'"}}`您可以使用该语句传递基于多个列的条件。 例如,使用与上一示例中相同的集合,如果希望快照仅包含集合中的内容,并且 ,可以发送以下请求:additional-conditionproductsproductscolor='blue'brand='MyBrand'
Key = `test_connector`
Value = `{"type":"execute-snapshot","data": {"data-collections": ["schema1.products"], "type": "INCREMENTAL", "additional-condition":"color='blue' AND brand='MyBrand'"}}`停止增量快照
还可以通过向源数据库上的集合发送信号来停止增量快照。 通过将文档插入到信号集合中来提交停止快照信号。 Debezium 检测到信令集合中的更改后,它会读取信号,并在增量快照操作正在进行时停止该操作。
您提交的查询指定 的快照操作,以及要删除的当前正在运行的快照的集合(可选)。incremental
先决条件
源数据库上存在信令数据收集。
信令数据收集在
signal.data.collection属性中指定。
使用源信令通道停止增量快照
将停止快照信号文档插入信令集合:
<signalDataCollection>.insert({"id" : _<idNumber>,"type" : "stop-snapshot", "data" : {"data-collections" ["<collectionName>", "<collectionName>"],"type": "incremental"}});例如
db.debeziumSignal.insert({ "type" : "stop-snapshot", "data" : { "data-collections" ["\"public\".\"Collection1\"", "\"public\".\"Collection2\""], "type": "incremental"} });信号命令中的 、 和参数的值对应于信令集合的字段。
idtypedata示例中的参数说明如下表所示:
表 4.插入命令中的字段说明,用于将停止增量快照文档发送到信令集合 项目 价值 描述 1
db.debeziumSignal指定源数据库上信令集合的完全限定名称。
2
零
前面示例中的 insert 方法省略了可选参数的使用。 由于文档没有显式为参数分配值,因此MongoDB自动分配给文档的任意id将成为信号请求的标识符。
使用此字符串标识将消息记录到信令集合中的条目。 Debezium 不使用此标识符字符串。_idid3
stop-snapshot该参数指定信号要触发的操作。
type4
data-collections信号字段的可选组件,用于指定集合名称或正则表达式的数组,以匹配要从快照中删除的集合名称。
该数组列出了按集合的完全限定名称匹配集合的正则表达式,其格式与在signal.data.collection配置属性中指定连接器的信令集合的名称的格式相同。 如果省略字段的此组件,信号将停止正在进行的整个增量快照。datadata5
incremental指定要停止的快照操作类型的信号字段的必需组件。
目前,唯一有效的选项是 。
如果未指定值,则信号无法停止增量快照。dataincrementaltype
使用 Kafka 信令通道停止增量快照
您可以向配置的 Kafka 信令主题发送信号消息以停止临时增量快照。
Kafka 消息的键必须与连接器配置选项的值匹配。topic.prefix
消息的值是带有 和 字段的 JSON 对象。typedata
信号类型为 ,并且该字段必须具有以下字段:stop-snapshotdata
| 田 | 违约 | 价值 |
|---|---|---|
|
| 要执行的快照的类型。 目前Debezium仅支持该类型。 |
| 不适用 | 逗号分隔的正则表达式的可选数组,与要包含在快照中的表的完全限定名称匹配。 |
以下示例显示了一个典型的 Kafka 消息:stop-snapshot
Key = `test_connector`
Value = `{"type":"stop-snapshot","data": {"data-collections": ["schema1.table1", "schema1.table2"], "type": "INCREMENTAL"}}`流式处理更改
副本集的连接器任务记录偏移量后,它使用偏移量来确定 oplog 中应开始流式传输更改的位置。 然后,任务(取决于配置)要么连接到副本集的主节点,要么连接到副本集范围的更改流,并从该位置开始流式传输更改。 它处理所有创建、插入和删除操作,并将它们转换为 Debezium 更改事件。 每个更改事件都包括操作日志中发现操作的位置,连接器会定期将其记录为其最近的偏移量。 记录偏移的时间间隔由 offset.flush.interval.ms 控制, 是 Kafka Connect 工作线程配置属性。
当连接器正常停止时,将记录上次处理的偏移量,以便在重新启动时,连接器将准确地从中断的位置继续。 但是,如果连接器的任务意外终止,则任务可能在上次记录偏移量之后但在记录最后一个偏移量之前处理并生成了事件;重新启动后,连接器从上次记录的偏移量开始,可能会生成一些与崩溃之前生成的事件相同的事件。
当 Kafka 管道中的所有组件名义上都运行时,Kafka 使用者只接收一次每条消息。 但是,当出现问题时,Kafka 只能保证消费者至少收到一次每条消息。 为了避免意外结果,使用者必须能够处理重复的消息。 |
如前所述,连接器任务始终使用副本集的主节点从 oplog 流式传输更改,确保连接器尽可能看到最新的操作,并且可以以比使用辅助节点更低的延迟捕获更改。 当副本集选择新的主节点时,连接器会立即停止流式传输更改,连接到新的主节点,并从位于相同位置的新主节点开始流式传输更改。 同样,如果连接器在与副本集成员通信时遇到任何问题,它会尝试重新连接,方法是使用指数退避,以免使副本集不堪重负,连接后,它将继续从上次中断的位置流式传输更改。 通过这种方式,连接器能够动态调整副本集成员身份的变化,并自动处理通信故障。
总而言之,MongoDB连接器在大多数情况下继续运行。通信问题可能会导致连接器等待问题解决。
前映像支持
在 MongoDB 6.0 及更高版本中,您可以配置更改流以发出文档的前映像状态,以填充 MongoDB 更改事件的字段。 要在 MongoDB 中启用前映像的使用,必须使用 、 或 来设置集合的 。 要使 Debezium MongoDB 能够在更改事件中包含前映像,请将连接器的 设置为其中一个选项。beforechangeStreamPreAndPostImagesdb.createCollection()createcollModcapture.mode*_with_pre_image
MongoDB 更改流事件的大小限制 MongoDB 更改流事件的大小限制为 16 MB。 因此,使用前映像会增加超过此阈值的可能性,从而导致失败。 有关如何避免超出更改流限制的信息,请参阅 MongoDB 文档。 |
主题名称
MongoDB 连接器将所有插入、更新和删除操作的事件写入每个集合中的文档到单个 Kafka 主题。 Kafka 主题的名称始终采用 logicalName 的形式。数据库名称。collectionName,其中 logicalName 是使用配置属性指定的连接器的逻辑名称,databaseName 是发生操作的数据库的名称,collectionName 是受影响文档所在的 MongoDB 集合的名称。topic.prefix
例如,考虑一个 MongoDB 副本集,其中包含包含四个集合的数据库:、、 和 。 如果为监视此数据库的连接器指定了逻辑名称 ,则该连接器将生成有关以下四个 Kafka 主题的事件:inventoryproductsproducts_on_handcustomersordersfulfillment
fulfillment.inventory.productsfulfillment.inventory.products_on_handfulfillment.inventory.customersfulfillment.inventory.orders
请注意,主题名称不包含副本集名称或分片名称。 因此,对分片集合(其中每个分片包含集合文档的子集)的所有更改都指向同一 Kafka 主题。
您可以将 Kafka 设置为根据需要自动创建主题。 如果没有,则必须使用 Kafka 管理工具在启动连接器之前创建主题。
分区
MongoDB 连接器不会明确确定如何对事件的主题进行分区。 相反,它允许 Kafka 确定如何根据事件键对主题进行分区。 您可以通过在 Kafka Connect 工作线程配置中定义实现的名称来更改 Kafka 的分区逻辑。Partitioner
Kafka 仅对写入单个主题分区的事件保持总顺序。 按键对事件进行分区确实意味着具有相同键的所有事件始终转到同一分区。 这可确保特定文档的所有事件始终完全有序。
事务元数据
Debezium 可以生成表示事务元数据边界的事件,并丰富变更数据事件消息。
Debezium 接收事务元数据的时间限制 Debezium 仅为部署连接器后发生的事务注册和接收元数据。 在部署连接器之前发生的事务的元数据不可用。 |
对于每个事务和,Debezium 都会生成一个包含以下字段的事件:BEGINEND
statusBEGIN或ENDid唯一事务标识符的字符串表示形式。
event_count(对于活动)END事务发出的事件总数。
data_collections(对于活动)END和 的数组对,提供源自给定数据收集的更改发出的事件数。
data_collectionevent_count
以下示例显示了一个典型消息:
{
"status": "BEGIN",
"id": "1462833718356672513",
"event_count": null,
"data_collections": null
}
{
"status": "END",
"id": "1462833718356672513",
"event_count": 2,
"data_collections": [
{
"data_collection": "rs0.testDB.collectiona",
"event_count": 1
},
{
"data_collection": "rs0.testDB.collectionb",
"event_count": 1
}
]
}除非通过 topic.transaction 选项被覆盖, 事务事件将写入名为 <topic.prefix> 的主题。.transaction
更改数据事件扩充
启用事务元数据后,将使用新字段丰富数据消息。 此字段以字段组合的形式提供有关每个事件的信息:Envelopetransaction
id唯一事务标识符的字符串表示形式。
total_order事件在事务生成的所有事件中的绝对位置。
data_collection_order事件在事务发出的所有事件中的每个数据收集位置。
下面是消息外观的示例:
{
"after": "{\"_id\" : {\"$numberLong\" : \"1004\"},\"first_name\" : \"Anne\",\"last_name\" : \"Kretchmar\",\"email\" : \"annek@noanswer.org\"}",
"source": {
...
},
"op": "c",
"ts_ms": "1580390884335",
"transaction": {
"id": "1462833718356672513",
"total_order": "1",
"data_collection_order": "1"
}
}数据更改事件
Debezium MongoDB 连接器为每个插入、更新或删除数据的文档级操作生成数据更改事件。每个事件都包含一个键和一个值。键和值的结构取决于已更改的集合。
Debezium 和 Kafka Connect 是围绕连续的事件消息流设计的。但是,这些事件的结构可能会随着时间的推移而变化,这对消费者来说可能难以处理。为了解决此问题,每个事件都包含其内容的架构,或者,如果您使用的是架构注册表,则包含使用者可用于从注册表获取架构的架构 ID。这使得每个事件都是独立的。
以下框架 JSON 显示了更改事件的基本四个部分。但是,如何配置选择在应用程序中使用的 Kafka Connect 转换器决定了这四个部分在更改事件中的表示形式。仅当您配置转换器以生成字段时,字段才处于更改事件中。同样,仅当您配置转换器以生成更改事件时,事件键和事件有效负载才会处于更改事件中。如果使用 JSON 转换器并将其配置为生成所有四个基本更改事件部分,则更改事件具有以下结构:schema
{
"schema": {
...
},
"payload": {
...
},
"schema": {
...
},
"payload": {
...
},
}| 项目 | 字段名称 | 描述 |
|---|---|---|
1 |
| 第一个字段是事件键的一部分。它指定一个 Kafka Connect 架构,用于描述事件键部分中的内容。换句话说,第一个字段描述已更改文档的键的结构。 |
2 |
| 第一个字段是事件键的一部分。它具有上一个字段描述的结构,并且包含已更改文档的键。 |
3 |
| 第二个字段是事件值的一部分。它指定描述事件值部分中内容的 Kafka Connect 架构。换句话说,第二个描述已更改文档的结构。通常,此架构包含嵌套架构。 |
4 |
| 第二个字段是事件值的一部分。它具有上一个字段描述的结构,并且包含已更改文档的实际数据。 |
默认情况下,连接器将更改事件记录流式传输到名称与事件的原始集合相同的主题。请参阅主题名称。
MongoDB 连接器确保所有 Kafka Connect 模式名称都遵循 Avro 模式名称格式。这意味着逻辑服务器名称必须以拉丁字母或下划线开头,即 a-z、A-Z 或 _。逻辑服务器名称中的每个剩余字符以及数据库和集合名称中的每个字符必须是拉丁字母、数字或下划线,即 a-z、A-Z、0-9 或 \_。如果存在无效字符,则将其替换为下划线字符。 如果逻辑服务器名称、数据库名称或集合名称包含无效字符,并且唯一区分名称的字符无效,因此替换为下划线,则这可能会导致意外冲突。 |
更改事件键
更改事件的键包含已更改文档键的架构和已更改文档的实际键。对于给定集合,架构及其相应的有效负载都包含一个字段。 此字段的值是文档的标识符,表示为派生自 MongoDB 扩展 JSON 序列化严格模式的字符串。id
考虑逻辑名称为 的连接器 、包含数据库的副本集和包含如下文档的集合。fulfillmentinventorycustomers
示例文档
{
"_id": 1004,
"first_name": "Anne",
"last_name": "Kretchmar",
"email": "annek@noanswer.org"
}示例更改事件键
捕获集合更改的每个更改事件都具有相同的事件键架构。只要集合具有以前的定义,捕获集合更改的每个更改事件都具有以下键结构。在 JSON 中,它看起来像这样:customerscustomerscustomers
{
"schema": {
"type": "struct",
"name": "fulfillment.inventory.customers.Key",
"optional": false,
"fields": [
{
"field": "id",
"type": "string",
"optional": false
}
]
},
"payload": {
"id": "1004"
}
}| 项目 | 字段名称 | 描述 |
|---|---|---|
1 |
| 密钥的架构部分指定一个 Kafka Connect 架构,该架构描述密钥部分中的内容。 |
2 |
| 定义密钥有效负载结构的架构的名称。此架构描述已更改文档的键的结构。密钥架构名称的格式为连接器名称。数据库名称。集合名称..在此示例中:
|
3 |
| 指示事件键的字段中是否必须包含值。在此示例中,键的有效负载中的值是必需的。当文档没有键时,键的有效负载字段中的值是可选的。 |
4 |
| 指定 中应包含的每个字段,包括每个字段的名称、类型以及是否为必填字段。 |
5 |
| 包含为其生成此更改事件的文档的键。在此示例中,键包含单个类型的字段,其值为 。 |
此示例使用具有整数标识符的文档,但任何有效的 MongoDB 文档标识符的工作方式都相同,包括文档标识符。对于文档标识符,事件键的值是一个字符串,该字符串将更新文档的原始字段表示为使用严格模式的 MongoDB 扩展 JSON 序列化。下表提供了如何表示不同类型的字段的示例。payload.id_id_id
| 类型 | 蒙戈数据库值_id | 密钥的有效负载 |
|---|---|---|
整数 | 1234 |
|
浮 | 12.34 |
|
字符串 | "1234" |
|
公文 |
|
|
对象标识 |
|
|
二元的 |
|
|
更改事件值
更改事件中的值比键稍微复杂一些。与键一样,该值有一个部分和一个部分。该节包含描述节结构(包括其嵌套字段)的架构。创建、更新或删除数据的操作的更改事件都具有具有信封结构的值有效负载。schemapayloadschemaEnvelopepayload
请考虑用于显示更改事件键示例的同一示例文档:
示例文档
{
"_id": 1004,
"first_name": "Anne",
"last_name": "Kretchmar",
"email": "annek@noanswer.org"
}针对每种事件类型描述了用于更改本文档的更改事件的值部分:
创建事件
下面的示例演示连接器为在集合中创建数据的操作生成的更改事件的值部分:customers
{
"schema": {
"type": "struct",
"fields": [
{
"type": "string",
"optional": true,
"name": "io.debezium.data.Json",
"version": 1,
"field": "after"
},
{
"type": "string",
"optional": true,
"name": "io.debezium.data.Json",
"version": 1,
"field": "patch"
},
{
"type": "struct",
"fields": [
{
"type": "string",
"optional": false,
"field": "version"
},
{
"type": "string",
"optional": false,
"field": "connector"
},
{
"type": "string",
"optional": false,
"field": "name"
},
{
"type": "int64",
"optional": false,
"field": "ts_ms"
},
{
"type": "boolean",
"optional": true,
"default": false,
"field": "snapshot"
},
{
"type": "string",
"optional": false,
"field": "db"
},
{
"type": "string",
"optional": false,
"field": "rs"
},
{
"type": "string",
"optional": false,
"field": "collection"
},
{
"type": "int32",
"optional": false,
"field": "ord"
},
{
"type": "int64",
"optional": true,
"field": "h"
}
],
"optional": false,
"name": "io.debezium.connector.mongo.Source",
"field": "source"
},
{
"type": "string",
"optional": true,
"field": "op"
},
{
"type": "int64",
"optional": true,
"field": "ts_ms"
}
],
"optional": false,
"name": "dbserver1.inventory.customers.Envelope"
},
"payload": {
"after": "{\"_id\" : {\"$numberLong\" : \"1004\"},\"first_name\" : \"Anne\",\"last_name\" : \"Kretchmar\",\"email\" : \"annek@noanswer.org\"}",
"source": {
"version": "2.3.3.Final",
"connector": "mongodb",
"name": "fulfillment",
"ts_ms": 1558965508000,
"snapshot": false,
"db": "inventory",
"rs": "rs0",
"collection": "customers",
"ord": 31,
"h": 1546547425148721999
},
"op": "c",
"ts_ms": 1558965515240
}
}| 项目 | 字段名称 | 描述 |
|---|---|---|
1 |
| 值的架构,描述值的有效负载的结构。在连接器为特定集合生成的每个更改事件中,更改事件的值架构都是相同的。 |
2 |
| 在该部分中,每个字段指定值有效负载中字段的架构。 |
3 |
|
|
4 |
|
|
5 |
| 值的实际数据。这是更改事件提供的信息。 |
6 |
| 一个可选字段,用于指定事件发生后文档的状态。 在此示例中,该字段包含新文档的 、 和 字段的值。 该值始终为字符串。 按照惯例,它包含文档的 JSON 表示形式。 MongoDB oplog 条目仅包含 _create_ 事件的文档的完整状态,当该选项设置为 ; 换句话说,创建事件是唯一一种包含 After 字段的事件,而不考虑选项。 |
7 |
| 描述事件的源元数据的必填字段。此字段包含可用于将此事件与其他事件进行比较的信息,包括事件的来源、事件发生的顺序以及事件是否属于同一事务。源元数据包括:
|
8 |
| 描述导致连接器生成事件的操作类型的必需字符串。在此示例中,指示操作创建了一个文档。有效值为:
|
9 |
| 显示连接器处理事件的时间的可选字段。时间基于运行 Kafka Connect 任务的 JVM 中的系统时钟。 |
更新事件
更改流捕获模式
示例集合中更新的更改事件的值与该集合的创建事件具有相同的架构。 同样,事件值的有效负载具有相同的结构。 但是,事件值负载在更新事件中包含不同的值。 仅当该选项设置为 时,更新事件才包含该值。 如果将选项设置为其中一个选项,则会提供一个值。 在这种情况下,有一个新的结构化字段,其中包含一些附加字段:customersaftercapture.modechange_streams_update_fullbeforecapture.mode*_with_pre_imageupdateDescription
updatedFields是一个字符串字段,其中包含更新的文档字段及其值的 JSON 表示形式removedFields是从文档中删除的字段名称的列表truncatedArrays是文档中被截断的数组的列表
下面是连接器为集合中的更新生成的事件中的更改事件值的示例:customers
{
"schema": { ... },
"payload": {
"op": "u",
"ts_ms": 1465491461815,
"before":"{\"_id\": {\"$numberLong\": \"1004\"},\"first_name\": \"unknown\",\"last_name\": \"Kretchmar\",\"email\": \"annek@noanswer.org\"}",
"after":"{\"_id\": {\"$numberLong\": \"1004\"},\"first_name\": \"Anne Marie\",\"last_name\": \"Kretchmar\",\"email\": \"annek@noanswer.org\"}",
"updateDescription": {
"removedFields": null,
"updatedFields": "{\"first_name\": \"Anne Marie\"}",
"truncatedArrays": null
},
"source": {
"version": "2.3.3.Final",
"connector": "mongodb",
"name": "fulfillment",
"ts_ms": 1558965508000,
"snapshot": false,
"db": "inventory",
"rs": "rs0",
"collection": "customers",
"ord": 1,
"h": null,
"tord": null,
"stxnid": null,
"lsid":"{\"id\": {\"$binary\": \"FA7YEzXgQXSX9OxmzllH2w==\",\"$type\": \"04\"},\"uid\": {\"$binary\": \"47DEQpj8HBSa+/TImW+5JCeuQeRkm5NMpJWZG3hSuFU=\",\"$type\": \"00\"}}",
"txnNumber":1
}
}
}| 项目 | 字段名称 | 描述 |
|---|---|---|
1 |
| 描述导致连接器生成事件的操作类型的必需字符串。在此示例中,指示操作更新了文档。 |
2 |
| 显示连接器处理事件的时间的可选字段。时间基于运行 Kafka Connect 任务的 JVM 中的系统时钟。 |
3 |
| 包含更改之前实际 MongoDB 文档的 JSON 字符串表示形式。 + 如果捕获模式未设置为其中一个选项,则更新事件值不包含字段。 |
4 |
| 包含实际 MongoDB 文档的 JSON 字符串表示形式。 |
5 |
| 包含文档的更新字段值的 JSON 字符串表示形式。在此示例中,更新将字段更改为新值。 |
6 |
| 描述事件的源元数据的必填字段。此字段包含与同一集合的创建事件相同的信息,但值不同,因为此事件来自 oplog 中的不同位置。源元数据包括:
|
事件中的值应作为文档的时间点值进行处理。 该值不是动态计算的,而是从集合中获取的。 因此,如果多个更新紧随其后,则所有更新更新事件都可能包含相同的值,该值将代表文档中存储的最后一个值。 如果您的应用程序依赖于逐步变化的演变,那么您应该只依赖。 |
删除事件
删除更改事件中的值与同一集合的创建和更新事件具有相同的部分。删除事件中的部分包含的值不同于同一集合的创建和更新事件。特别是,删除事件既不包含值,也不包含值。下面是集合中文档的删除事件示例:schemapayloadafterupdateDescriptioncustomers
{
"schema": { ... },
"payload": {
"op": "d",
"ts_ms": 1465495462115,
"before":"{\"_id\": {\"$numberLong\": \"1004\"},\"first_name\": \"Anne Marie\",\"last_name\": \"Kretchmar\",\"email\": \"annek@noanswer.org\"}",
"source": {
"version": "2.3.3.Final",
"connector": "mongodb",
"name": "fulfillment",
"ts_ms": 1558965508000,
"snapshot": true,
"db": "inventory",
"rs": "rs0",
"collection": "customers",
"ord": 6,
"h": 1546547425148721999
}
}
}| 项目 | 字段名称 | 描述 |
|---|---|---|
1 |
| 描述操作类型的必需字符串。字段值为 ,表示此文档已删除。 |
2 |
| 显示连接器处理事件的时间的可选字段。时间基于运行 Kafka Connect 任务的 JVM 中的系统时钟。 |
3 |
| 包含更改之前实际 MongoDB 文档的 JSON 字符串表示形式。 + 如果捕获模式未设置为其中一个选项,则更新事件值不包含字段。 |
4 |
| 描述事件的源元数据的必填字段。此字段包含与同一集合的创建或更新事件相同的信息,但值不同,因为此事件来自 oplog 中的不同位置。源元数据包括:
|
MongoDB连接器事件旨在与Kafka日志压缩一起使用。日志压缩允许删除一些较旧的消息,只要至少保留每个键的最新消息。这允许 Kafka 回收存储空间,同时确保主题包含完整的数据集,并可用于重新加载基于密钥的状态。
墓碑事件
唯一标识文档的所有MongoDB连接器事件都具有完全相同的键。删除文档后,删除事件值仍适用于日志压缩,因为 Kafka 可以删除具有相同键的所有早期消息。但是,要使 Kafka 删除具有该键的所有消息,消息值必须为 。为了实现这一点,在Debezium的MongoDB连接器发出删除事件后,连接器会发出一个特殊的逻辑删除事件,该事件具有相同的键但有一个值。逻辑删除事件通知 Kafka 可以删除具有相同键的所有消息。nullnull
设置MongoDB
MongoDB 连接器使用 MongoDB 的更改流来捕获更改,因此连接器仅适用于 MongoDB 副本集或分片集群,其中每个分片都是一个单独的副本集。 请参阅 MongoDB 文档,了解如何设置副本集或分片集群。 此外,请务必了解如何使用副本集启用访问控制和身份验证。
您还必须有一个具有适当角色的MongoDB用户来读取可以读取oplog的数据库。此外,用户还必须能够读取分片群集的配置服务器中的数据库,并且必须具有特权操作。 使用更改流(缺省值)时,用户还必须具有群集范围的权限操作和 。adminconfiglistDatabasesfindchangeStream
当您打算使用前映像并填充字段时,您需要首先使用 、 或 为集合启用。beforechangeStreamPreAndPostImagesdb.createCollection()createcollMod
云中的MongoDB
您可以将 MongoDB 的 Debezium 连接器与 MongoDB Atlas 一起使用。 请注意,MongoDB Atlas仅支持通过SSL进行安全连接,即+mongodb.ssl.enabled 连接器选项必须设置为。true
部署
要部署 Debezium MongoDB 连接器,请安装 Debezium MongoDB 连接器存档,配置连接器,然后通过将其配置添加到 Kafka Connect 来启动连接器。
先决条件
安装了 Apache Zookeeper、Apache Kafka 和 Kafka Connect。
MongoDB已安装并设置为与Debezium连接器一起使用。
程序
下载连接器的插件存档,
将 JAR 文件解压缩到 Kafka Connect 环境中。
将包含 JAR 文件的目录添加到 Kafka Connect 的
plugin.path中。重新启动 Kafka Connect 进程以选取新的 JAR 文件。
如果您使用的是不可变容器,请参阅 Debezium 的 Apache Zookeeper、Apache Kafka 和 Kafka 的容器映像 使用已安装并准备运行的 MongoDB 连接器连接。
你也可以在 Kubernetes 和 OpenShift 上运行 Debezium。
Debezium 教程将引导您使用这些图像,这是了解 Debezium 的好方法。
MongoDB连接器配置示例
下面是连接器实例的配置示例,该实例从 27017.192.168.99 上的端口 100 处的 MongoDB 副本集捕获数据,我们在逻辑上将其命名为 。 通常,您可以通过设置可用于连接器的配置属性,在 JSON 文件中配置 Debezium MongoDB 连接器。rs0fullfillment
您可以选择为特定的 MongoDB 副本集或分片集群生成事件。 (可选)可以筛选出不需要的集合。
{
"name": "inventory-connector",
"config": {
"connector.class": "io.debezium.connector.mongodb.MongoDbConnector",
"mongodb.connection.string": "mongodb://192.168.99.100:27017/?replicaSet=rs0",
"topic.prefix": "fullfillment",
"collection.include.list": "inventory[.]*"
}
}| 向 Kafka Connect 服务注册连接器时连接器的名称。 | |
| MongoDB 连接器类的名称。 | |
| 用于连接到 MongoDB 副本集的连接字符串。 | |
| MongoDB 副本集的逻辑名称,它为生成的事件形成命名空间,并在连接器写入的 Kafka 主题的所有名称、Kafka Connect 架构名称以及使用 Avro 转换器时相应 Avro 架构的命名空间中使用。 | |
| 与要监视的所有集合的集合命名空间(例如,<dbName>.<collectionName>)匹配的正则表达式列表。这是可选的。 |
有关可以为 Debezium MongoDB 连接器设置的配置属性的完整列表, 请参阅 MongoDB 连接器配置属性。
您可以使用命令将此配置发送到正在运行的 Kafka Connect 服务。 该服务记录配置并启动一个执行以下操作的连接器任务:POST
连接到 MongoDB 副本集或分片集群。
为每个副本集分配任务。
如有必要,执行快照。
读取更改流。
流将事件记录更改为 Kafka 主题。
添加连接器配置
要开始运行 Debezium MongoDB 连接器,请创建连接器配置,并将配置添加到 Kafka Connect 集群。
先决条件
安装了Debezium MongoDB连接器。
程序
为 MongoDB 连接器创建配置。
使用 Kafka Connect REST API 将该连接器配置添加到 Kafka Connect 集群。
结果
连接器启动后,将完成以下操作:
对 MongoDB 副本集中的集合执行一致的快照。
读取副本集的更改流。
为每个插入、更新和删除的文档生成更改事件。
流将事件记录更改为 Kafka 主题。
连接器属性
Debezium MongoDB 连接器具有许多配置属性,您可以使用这些属性为应用程序实现正确的连接器行为。 许多属性具有默认值。有关属性的信息组织如下:
除非有可用的默认值,否则以下配置属性是必需的。
| 财产 | 违约 | 描述 | ||||
|---|---|---|---|---|---|---|
无默认值 | 连接器的唯一名称。尝试使用相同的名称再次注册将失败。(此属性是所有 Kafka Connect 连接器所必需的。 | |||||
无默认值 | 连接器的 Java 类的名称。始终对 MongoDB 连接器使用值 。 | |||||
无默认值 | 指定连接器用于连接到 MongoDB 副本集的连接字符串。 此属性将替换以前版本的 MongoDB 连接器中可用的属性。
| |||||
| 指定连接器在连接到 MongoDB 集群时使用的策略。 将此属性设置为以下值之一:
| |||||
无默认值 | 标识此连接器监视的连接器和/或 MongoDB 副本集或分片集群的唯一名称。 每个服务器最多应该由一个Debezium连接器监控,因为这个服务器名称前缀了从MongoDB副本集或集群发出的所有持久化的Kafka主题。 仅使用字母数字字符、连字符、点和下划线构成名称。 逻辑名称在所有其他连接器中应该是唯一的,因为该名称用作命名从此连接器接收记录的 Kafka 主题的前缀。
| |||||
无默认值 | 连接到 MongoDB 时要使用的数据库用户的名称。仅当MongoDB配置为使用身份验证时,才需要这样做。 | |||||
无默认值 | 连接到MongoDB时使用的密码。仅当MongoDB配置为使用身份验证时,才需要这样做。 | |||||
| 包含MongoDB凭据的数据库(身份验证源)。仅当 MongoDB 配置为对另一个身份验证数据库(而不是 )使用身份验证时,才需要执行此操作。 | |||||
| 连接器将使用SSL连接到MongoDB实例。 | |||||
| 启用 SSL 后,此设置控制在连接阶段是否禁用严格的主机名检查。如果连接不能防止中间人攻击。 | |||||
空字符串 | 与要监视的数据库名称匹配的正则表达式的可选逗号分隔列表。 默认情况下,将监视所有数据库。 为了匹配数据库的名称,Debezium 会应用您指定为定位正则表达式的正则表达式。 也就是说,指定的表达式与数据库的整个名称字符串匹配;它与数据库名称中可能存在的子字符串不匹配。 | |||||
空字符串 | 与要从监视中排除的数据库名称匹配的正则表达式的可选逗号分隔列表。 设置为 后,连接器将监视除属性指定的数据库之外的每个数据库。 为了匹配数据库的名称,Debezium 会应用您指定为定位正则表达式的正则表达式。 也就是说,指定的表达式与数据库的整个名称字符串匹配;它与数据库名称中可能存在的子字符串不匹配。 | |||||
空字符串 | 一个可选的逗号分隔的正则表达式列表,与要监视的 MongoDB 集合的完全限定命名空间匹配。 默认情况下,连接器监视除 和 数据库中的集合之外的所有集合。 设置为 后,连接器仅监视属性指定的集合。 其他集合从监视中排除。 集合标识符的格式为数据库名称。集合名称。 为了匹配命名空间的名称,Debezium 应用您指定为定位正则表达式的正则表达式。 也就是说,指定的表达式与命名空间的整个名称字符串匹配;它与名称中的子字符串不匹配。 | |||||
空字符串 | 一个可选的逗号分隔的正则表达式列表,这些正则表达式与要从监视中排除的 MongoDB 集合的完全限定命名空间匹配。 设置为 后,连接器将监视除属性指定的集合之外的每个集合。 集合标识符的格式为数据库名称。集合名称。 为了匹配命名空间的名称,Debezium 应用您指定为定位正则表达式的正则表达式。 也就是说,指定的表达式与命名空间的整个名称字符串匹配;它与数据库名称中可能存在的子字符串不匹配。 | |||||
| 指定连接器启动时执行快照的条件。 将该属性设置为以下值之一:
| |||||
| 指定连接器用于从 MongoDB 服务器捕获事件更改的方法。 将此属性设置为以下值之一:
| |||||
中指定的所有集合 | 一个可选的、以逗号分隔的正则表达式列表,与要包含在快照中的架构的完全限定名称 () 匹配。 指定的项必须在连接器的 为了匹配架构的名称,Debezium 会应用您指定为定位正则表达式的正则表达式。 也就是说,指定的表达式与架构的整个名称字符串匹配;它与架构名称中可能存在的子字符串不匹配。 | |||||
空字符串 | 应从更改事件消息值中排除的字段的完全限定名称的可选逗号分隔列表。 字段的完全限定名称的格式为 databaseName。集合名称。字段名称。nestedFieldName,其中 databaseName 和 collectionName 可能包含与任何字符匹配的通配符 (*)。 | |||||
空字符串 | 一个可选的逗号分隔列表,其中包含应用于重命名更改事件消息值中的字段的完全限定替换。字段的完全限定替换的格式为 databaseName。集合名称。字段名称。nestedFieldName:newNestedFieldName,其中 databaseName 和 collectionName 可能包含与任何字符匹配的通配符 (*)、冒号字符 (:)用于确定字段的重命名映射。下一个字段替换将应用于列表中上一个字段替换的结果,因此在重命名同一路径中的多个字段时请记住这一点。 | |||||
| 指定连接器用于连接到分片集群的最大任务数。 将连接器与单个 MongoDB 副本集一起使用时,默认值是可以接受的。 但是,当集群包含多个分片时,要使 Kafka Connect 能够为每个副本集分配工作,请指定一个等于或大于集群中分片数的值。 然后,MongoDB 连接器可以使用单独的任务连接到集群中每个分片的副本集。
| |||||
| 正整数值,指定用于执行副本集中集合的初始同步的最大线程数。默认值为 1。 | |||||
| 控制删除事件后是否跟有逻辑删除事件。 | |||||
无默认值 | 连接器在启动后拍摄快照之前应等待的时间间隔(以毫秒为单位); | |||||
| 指定在拍摄快照时应一次性从每个集合中读取的最大文档数。 连接器将读取此大小的多个批次中的集合内容。 | |||||
没有 | 指定应如何调整架构名称以与连接器使用的消息转换器兼容。可能的设置:
| |||||
没有 | 指定应如何调整字段名称以与连接器使用的消息转换器兼容。可能的设置:
有关更多详细信息,请参阅阿夫罗命名。 | |||||
无默认值 | 副本集中 MongoDB 服务器的主机名和端口对(格式为“host”或“host:port”)的逗号分隔列表。该列表可以包含单个主机名和端口对。
|
以下高级配置属性具有良好的默认值,这些默认值在大多数情况下都有效,因此很少需要在连接器的配置中指定。
| 财产 | 违约 | 描述 |
|---|---|---|
| 正整数值,指定在此连接器的每次迭代期间应处理的每批事件的最大大小。默认值为 2048。 | |
| 指定阻塞队列可以容纳的最大记录数的正整数值。 当 Debezium 读取从数据库流式传输的事件时,它会在将事件写入 Kafka 之前将事件放入阻塞队列中。 阻塞队列可以为从数据库中读取更改事件提供背压 如果连接器引入消息的速度快于将消息写入 Kafka 的速度,或者当 Kafka 变得不可用时。 当连接器定期记录偏移量时,将忽略队列中保存的事件。 始终将 的值设置为大于 | |
| 一个长整数值,它指定阻塞队列的最大音量(以字节为单位)。 默认情况下,不为阻塞队列指定卷限制。 若要指定队列可以使用的字节数,请将此属性设置为正长整型值。 | |
| 正整数值,指定连接器在每次迭代期间应等待显示新更改事件的毫秒数。默认为 500 毫秒或 0.5 秒。 | |
| 正整数值,指定在第一次连接尝试失败后尝试重新连接到主设备或没有主设备可用时尝试重新连接到主数据库时的初始延迟。默认为 1 秒(1000 毫秒)。 | |
| 正整数值,指定在重复连接尝试失败或没有主设备可用后尝试重新连接到主设备时的最大延迟。默认为 120 秒(120,000 毫秒)。 | |
| 正整数值,指定在发生异常和任务中止之前与副本集主副本集的连接尝试失败的最大次数。默认值为 16,默认值为 和 导致尝试失败前的尝试时间刚刚超过 20 分钟。 | |
v2 | CDC 事件中块的架构版本。Debezium 0.10 对块的结构进行了一些重大 | |
| 控制发送检测信号消息的频率。 此属性包含一个以毫秒为单位的间隔,该间隔定义连接器将消息发送到检测信号主题的频率。 将此参数设置为 为根本不发送检测信号消息。 | |
| 在流式处理期间将跳过的操作类型的逗号分隔列表。 这些操作包括:插入/创建、更新/替换、删除、修剪以及不跳过任何上述操作。 默认情况下,为了与其他 Debezium 连接器保持一致,将跳过截断操作(不由此连接器发出)。但是,由于MongoDB不支持截断更改事件,因此这实际上与指定. | |
无默认值 | 控制快照中包含哪些集合项。此属性仅影响快照。以 databaseName.collectionName 的形式指定以逗号分隔的集合名称列表。 对于指定的每个集合,还要指定另一个配置属性:。例如,其他配置属性的名称可能是:。将此属性设置为有效的筛选表达式,该表达式仅检索快照中所需的项。当连接器执行快照时,它仅检索与筛选器表达式匹配的项目。 | |
| 设置为 Debezium 时,会生成具有事务边界的事件,并使用事务元数据丰富数据事件包络。 有关更多详细信息,请参阅事务元数据。 | |
10000(10 秒) | 发生可重试错误后重新启动连接器之前等待的毫秒数。 | |
| 连接器轮询新的、已删除的或更改的副本集的时间间隔。 | |
10000(10 秒) | 驱动程序在中止新连接尝试之前等待的毫秒数。 | |
10000(10 秒) | 群集监视器尝试访问每个服务器的频率。 | |
0 | 套接字上的发送/接收在发生超时之前可能需要的毫秒数。 值 禁用此行为。 | |
30000(30 秒) | 驱动程序在超时并引发错误之前等待选择服务器的毫秒数。 | |
无默认值 | 流式处理更改时,此设置将处理应用于作为标准 MongoDB 聚合流管道一部分的更改流事件。管道是一个MongoDB聚合管道,由数据库的指令组成,用于过滤或转换数据。这可用于自定义连接器使用的数据。 此属性的值必须是 JSON 格式的允许聚合管道阶段的数组。 请注意,这被附加到用于支持连接器的内部管道之后(例如筛选操作类型、数据库名称、集合名称等)。 | |
| 指定oplog/更改流光标在导致执行超时异常之前等待服务器生成结果的最大毫秒数。 值 表示使用服务器/驱动程序默认等待超时。 | |
无默认值 | 用于向连接器发送信号的数据收集的完全限定名称。 使用以下格式指定集合名称: | |
源 | 为连接器启用的信令通道名称列表。 默认情况下,以下通道可用:
(可选)您还可以实现自定义信令通道。 | |
无默认值 | 为连接器启用的通知通道名称列表。 默认情况下,以下通道可用:
(可选)您还可以实现自定义通知通道。 | |
| 连接器在增量快照区块期间提取并读入内存的最大文档数。 增加区块大小可提供更高的效率,因为快照运行较少的较大大小的快照查询。 但是,较大的块大小也需要更多的内存来缓冲快照数据。 将区块大小调整为可在环境中提供最佳性能的值。 | |
| 应用于确定数据更改、架构更改、事务、检测信号事件等主题名称的 TopicNamingStrategy 类的名称默认为 。 | |
| 指定主题名称的分隔符,默认为 。 | |
| 用于在有界并发哈希映射中保存主题名称的大小。此缓存将有助于确定与给定数据收集相对应的主题名称。 | |
| 控制连接器向其发送检测信号消息的主题的名称。主题名称具有以下模式: | |
| 控制连接器向其发送事务元数据消息的主题的名称。主题名称具有以下模式: | |
| 失败前可重试错误(例如连接错误)的最大重试次数(-1 = 无限制,0 = 禁用,> 0 = 重试次数)。 |
Debezium 连接器 Kafka 信号配置属性
Debezium 提供了一组属性,用于控制连接器如何与 Kafka 信号主题交互。signal.*
下表描述了 Kafka 属性。signal
| 财产 | 违约 | 描述 | ||
|---|---|---|---|---|
<主题前缀>信号 | 连接器监视临时信号的 Kafka 主题的名称。
| |||
卡夫卡信号 | Kafka 使用者使用的组 ID 的名称。 | |||
无默认值 | 连接器用于建立与 Kafka 群集的初始连接的主机/端口对的列表。 每个对都引用 Debezium Kafka Connect 进程使用的 Kafka 集群。 | |||
| 一个整数值,它指定轮询信号时连接器等待的最大毫秒数。 |
Debezium 连接器直通信号 Kafka 使用者客户端配置属性
Debezium 连接器提供信号 Kafka 消费者的直通配置。 直通信号属性以前缀 开头。 例如,连接器将属性(如)传递给 Kafka 使用者。signals.consumer.*signal.consumer.security.protocol=SSL
与数据库模式历史记录客户端的传递属性一样,Debezium 在将前缀传递给 Kafka 信号使用者之前,会从属性中删除前缀。
Debezium 连接器接收器通知配置属性
下表描述了这些属性。notification
| 财产 | 违约 | 描述 |
|---|---|---|
无默认值 | 从 Debezium 接收通知的主题的名称。 将 |
下表描述了这些属性。notification
| 财产 | 违约 | 描述 |
|---|---|---|
无默认值 | 从 Debezium 接收通知的主题的名称。 将 |
监测
Debezium MongoDB连接器除了对Zookeeper,Kafka和Kafka Connect的JMX指标的内置支持外,还具有两种指标类型。
快照衡量指标提供有关执行快照时连接器操作的信息。
流式处理指标提供有关连接器捕获更改和流式处理更改事件记录时的连接器操作的信息。
Debezium 监视文档提供了有关如何使用 JMX 公开这些指标的详细信息。
快照指标
MBean 是 。debezium.mongodb:type=connector-metrics,context=snapshot,server=<topic.prefix>,task=<task.id>
除非快照操作处于活动状态,或者自上次连接器启动以来发生了快照,否则不会公开快照指标。
下表列出了可用的 shapshot 指标。
| 属性 | 类型 | 描述 |
|---|---|---|
| 连接器已读取的最后一个快照事件。 | |
| 自连接器读取和处理最新事件以来的毫秒数。 | |
| 此连接器自上次启动或重置以来出现的事件总数。 | |
| 已按连接器上配置的包含/排除列表筛选规则筛选的事件数。 | |
| 连接器捕获的表的列表。 | |
| 队列用于在快照程序和主 Kafka Connect 循环之间传递事件的长度。 | |
| 队列的可用容量,用于在快照程序和主 Kafka Connect 循环之间传递事件。 | |
| 快照中包含的表总数。 | |
| 快照尚未复制的表数。 | |
| 快照是否已启动。 | |
| 快照是否已暂停。 | |
| 快照是否已中止。 | |
| 快照是否完成。 | |
| 快照到目前为止所花费的总秒数,即使未完成。还包括快照暂停的时间。 | |
| 快照暂停的总秒数。如果快照暂停了几次,则暂停时间相加。 | |
| 包含快照中每个表扫描的行数的地图。 表在处理过程中以增量方式添加到映射中。 每扫描 10,000 行并在完成表时更新一次。 | |
| 队列的最大缓冲区(以字节为单位)。如果 | |
| 队列中记录的当前卷(以字节为单位)。 |
Debezium MongoDB 连接器还提供以下自定义快照指标:
| 属性 | 类型 | 描述 |
|---|---|---|
|
| 数据库断开连接的次数。 |
流式处理指标
MBean 是 。debezium.mongodb:type=connector-metrics,context=streaming,server=<topic.prefix>,task=<task.id>
下表列出了可用的流式处理指标。
| 属性 | 类型 | 描述 |
|---|---|---|
| 连接器已读取的最后一个流式处理事件。 | |
| 自连接器读取和处理最新事件以来的毫秒数。 | |
| 自上次启动或指标重置以来此连接器看到的事件总数。 | |
| 自上次启动或指标重置以来此连接器看到的创建事件总数。 | |
| 自上次启动或指标重置以来此连接器看到的更新事件总数。 | |
| 自上次启动或指标重置以来此连接器看到的删除事件总数。 | |
| 已按连接器上配置的包含/排除列表筛选规则筛选的事件数。 | |
| 连接器捕获的表的列表。 | |
| 用于在流处理器和主 Kafka Connect 循环之间传递事件的队列长度。 | |
| 队列的可用容量,用于在流处理器和主 Kafka Connect 循环之间传递事件。 | |
| 指示连接器当前是否连接到数据库服务器的标志。 | |
| 上次更改事件的时间戳与处理它的连接器之间的毫秒数。 这些值将指示运行数据库服务器和连接器的计算机上时钟之间的任何差异。 | |
| 已提交的已处理事务数。 | |
| 上次接收的事件的坐标。 | |
| 上次处理的交易的事务标识符。 | |
| 队列的最大缓冲区(以字节为单位)。如果 | |
| 队列中记录的当前卷(以字节为单位)。 |
Debezium MongoDB 连接器还提供以下自定义流式处理指标:
| 属性 | 类型 | 描述 |
|---|---|---|
|
| 数据库断开连接的次数。 |
|
| 主节点选举数。 |
MongoDB连接器常见问题
Debezium 是一个分布式系统,可以捕获多个上游数据库中的所有更改,并且永远不会错过或丢失任何事件。 当系统正常运行并得到谨慎管理时,Debezium 会为每个变更事件提供一次交付。
如果发生故障,系统不会丢失任何事件。 但是,当它从故障中恢复时,它可能会重复一些更改事件。 在这种情况下,Debezium 和 Kafka 一样,至少提供一次变更事件的交付。
本节的其余部分描述了 Debezium 如何处理各种故障和问题。
配置和启动错误
在以下情况下,连接器在尝试启动时失败,在日志中报告错误或异常,并停止运行:
连接器的配置无效。
连接器无法使用指定的连接参数成功连接到 MongoDB。
失败后,连接器会尝试使用指数退避重新连接。 您可以配置最大重新连接尝试次数。
在这些情况下,错误将包含有关问题的更多详细信息,并可能提供建议的解决方法。更正配置或解决 MongoDB 问题后,可以重新启动连接器。
MongoDB 变得不可用
连接器运行后,如果任何 MongoDB 副本集的主节点变得不可用或无法访问,连接器将反复尝试重新连接到主节点,使用指数退避来防止网络或服务器饱和。如果在可配置的连接尝试次数后主连接器仍然不可用,则连接器将失败。
重新连接的尝试由三个属性控制:
connect.backoff.initial.delay.ms- 首次尝试重新连接之前的延迟,默认值为 1 秒(1000 毫秒)。connect.backoff.max.delay.ms- 尝试重新连接之前的最大延迟,默认值为 120 秒(120,000 毫秒)。connect.max.attempts- 产生错误之前的最大尝试次数,默认值为 16。
每个延迟是前一个延迟的两倍,直至最大延迟。给定默认值,下表显示了每次失败的连接尝试的延迟以及失败前的总累积时间。
| 重新连接尝试次数 | 尝试前延迟,以秒为单位 | 尝试前的总延迟时间,以分钟和秒为单位 |
|---|---|---|
1 | 1 | 00:01 |
2 | 2 | 00:03 |
3 | 4 | 00:07 |
4 | 8 | 00:15 |
5 | 16 | 00:31 |
6 | 32 | 01:03 |
7 | 64 | 02:07 |
8 | 120 | 04:07 |
9 | 120 | 06:07 |
10 | 120 | 08:07 |
11 | 120 | 10:07 |
12 | 120 | 12:07 |
13 | 120 | 14:07 |
14 | 120 | 16:07 |
15 | 120 | 18:07 |
16 | 120 | 20:07 |
卡夫卡连接进程正常停止
如果 Kafka Connect 在分布式模式下运行,并且 Kafka Connect 进程正常停止,则在该进程关闭之前,Kafka Connect 会将所有进程的连接器任务迁移到该组中的另一个 Kafka Connect 进程,并且新的连接器任务将准确地从先前任务停止的位置继续。 当连接器任务正常停止并在新进程上重新启动时,处理会有短暂的延迟。
如果组仅包含一个进程,并且该进程已正常停止,则 Kafka Connect 将停止连接器并记录每个副本集的最后一个偏移量。重新启动后,副本集任务将完全从中断的位置继续。
卡夫卡连接进程崩溃
如果 Kafka 连接器进程意外停止,则它正在运行的任何连接器任务都将终止,而不会记录其最近处理的偏移量。 当 Kafka Connect 在分布式模式下运行时,它将在其他进程上重新启动这些连接器任务。 但是,MongoDB 连接器将从早期进程记录的最后一个偏移量恢复,这意味着新的替换任务可能会生成一些在崩溃之前处理的相同更改事件。 重复事件的数量取决于偏移刷新周期和崩溃前更改的数据量。
由于在从故障中恢复期间可能会重复某些事件,因此使用者应始终预测某些事件可能会重复。Debezium 更改是幂等的,因此一系列事件总是导致相同的状态。 Debezium 还包含每个更改事件消息中有关事件起源的特定于源的信息,包括 MongoDB 事件的唯一事务标识符 () 和时间戳 ( 和 )。使用者可以跟踪这些值中的其他值,以了解它是否已经看到了特定事件。 |
卡夫卡变得不可用
当连接器生成变更事件时,Kafka Connect 框架使用 Kafka 生产者 API 在 Kafka 中记录这些事件。Kafka Connect 还将定期记录这些变更事件中出现的最新偏移量,其频率与您在 Kafka Connect 工作线程配置中指定的频率相同。如果 Kafka 代理不可用,则运行连接器的 Kafka Connect 工作进程将反复尝试重新连接到 Kafka 代理。换句话说,连接器任务将只是暂停,直到可以重新建立连接,此时连接器将准确地从中断的位置恢复。
连接器在长时间停止后失败,如果设置为snapshot.modeinitial
如果连接器正常停止,用户可能会继续对副本集成员执行操作。 连接器脱机时发生的更改将继续记录在MongoDB的oplog中。 在大多数情况下,连接器重新启动后,它会读取 oplog 中的偏移值以确定它为每个副本集流式传输的最后一个操作,然后从该点恢复流式处理更改。 重新启动后,连接器停止时发生的数据库操作将照常发送到 Kafka,一段时间后,连接器将赶上数据库。 连接器赶上所需的时间取决于 Kafka 的功能和性能以及数据库中发生的更改量。
但是,如果连接器保持停止足够长的时间间隔,则MongoDB可能会在连接器处于非活动状态期间清除oplog,从而导致有关连接器最后位置的信息丢失。 连接器重新启动后,它无法恢复流式传输,因为 oplog 不再包含标记连接器处理的最后一个操作的上一个偏移值。 连接器也无法执行快照,就像通常将属性设置为 且不存在偏移值时那样。 在这种情况下,存在不匹配,因为 oplog 不包含前一个偏移量的值,但偏移量值存在于连接器的内部 Kafka 偏移量主题中。 出现错误,连接器失败。snapshot.modeinitial
若要从故障中恢复,请删除发生故障的连接器,然后创建具有相同配置但连接器名称不同的新连接器。 启动新连接器时,它会执行快照以引入数据库的状态,然后恢复流式处理。
MongoDB丢失写入
在某些故障情况下,MongoDB可能会丢失提交,这会导致MongoDB连接器无法捕获丢失的更改。 例如,如果主节点在应用更改并将更改记录到其 oplog 后突然崩溃,则在辅助节点读取其内容之前,操作日志可能会变得不可用。 因此,被选为新主节点的辅助节点可能缺少其 oplog 中的最新更改。
目前,在MongoDB中没有办法防止这种副作用。




