对Oracle的支持
Debezium支持以Oracle为源进行抽取。抽取原理为:以Oracle提供的XStream API,从日志中捕获变更事件。但是XStream API需要Oracle GoldenGate产品的License,因此在没有购买GoldenGate License的场景下使用存在法律风险,不推荐。
另外一点,Debezium的主要代码托管在https://github.com/debezium/debezium,而Oracle, Cassandra等connector代码还在“孵化”中,位于https://github.com/debezium/debezium-incubator
Kafka Connect rebalance问题
这个问题非常普遍,在Kafka Connect2.3版本出来之前,几乎所有Debezium用户都有碰到。
Kafka Connect运行在distribute模式,在2.3版本之前,每当有connector新增、修改或删除后,就会触发集群的rebalance,然后集群内部所有connector停止并重启。但由于Debezium connector重启是很重的操作,导致重启有可能造成集群内多个connector挂掉。2.3版本之后,Kafka Connect采用了incremental cooperative rebalancing design,在reblance期间不会停止并重启所有connector,详见:http://kafka.apache.org/documentation/#upgrade_230_notable
因此建议升级Kafka Connect到2.3版本。
restart_lsn一直不动
Debezium运行几天后,突然源库pg磁盘告警。排查后发现pg某些复制槽的lsn一直不动,导致pg积压太多wal日志无法清理。
如下图所示,观察pg复制槽的状态:
postgres=# select * from pg_replication_slots;
slot_name | plugin | slot_type | datoid | database | active | active_pid | xmin | catalog_xmin | restart_lsn | confirmed_flush_lsn
-----------------+----------+-----------+--------+----------+--------+------------+------+--------------+-------------+---------------------
***36h2x1****** | wal2json | logical | 33032 | ******* | t | 64259 | | 1479674 | 7/6D592C38 | 7/6D592C38
***_***ect2 | wal2json | logical | 16390 | aaa | f | | | 1809194 | 8/51637810 | 8/51637810
***** | wal2json | logical | 33032 | ******* | t | 10457 | | 1834975 | 8/565E7CD8 | 8/5917F550
(8 rows)
所有slot都是Debezium创建的,连接的都是同一台pg实例,其中第1个与第3个slot均为active状态,但restart_lsn和confirmed_flush_lsn却相差甚远。
这是为什么?难不成slot3的处理速度跟不上slot1?对Kafka Connect进程断点调试之后发现这两个slot都能收到最新的wal日志,因此排除了这种可能。
那就只能从源码着手,Debezium在收到pg的wal日志之后,要向pg进行应答,我们就要找出源码里何处进行应答:
io.debezium.connector.postgresql.connection.PostgresReplicationConnection
@Override
public void flushLsn(long lsn) throws SQLException {
doFlushLsn(LogSequenceNumber.valueOf(lsn));
}
private void doFlushLsn(LogSequenceNumber lsn) throws SQLException {
stream.setFlushedLSN(lsn);
stream.setAppliedLSN(lsn);
stream.forceUpdateStatus();
}
io.debezium.connector.postgresql.PostgresStreamingChangeEventSource
@Override
public void commitOffset(Map<String, ?> offset) {
try {
ReplicationStream replicationStream = this.replicationStream.get();
final Long lsn = (Long) offset.get(PostgresOffsetContext.LAST_COMPLETELY_PROCESSED_LSN_KEY);
if (replicationStream != null && lsn != null) {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("Flushing LSN to server: {}", LogSequenceNumber.valueOf(lsn));
}
// tell the server the point up to which we've processed data, so it can be free to recycle WAL segments
replicationStream.flushLsn(lsn);
}
else {
LOGGER.debug("Streaming has already stopped, ignoring commit callback...");
}
}
catch (SQLException e) {
throw new ConnectException(e);
}
}
io.debezium.connector.postgresql.PostgresConnectorTask
@Override
public void commit() throws InterruptedException {
if (coordinator != null) {
coordinator.commitOffset(lastOffset);
}
}
org.apache.kafka.connect.runtime.WorkerSourceTask
@Override
public void execute() {
try {
task.initialize(new WorkerSourceTaskContext(offsetReader, this, configState));
task.start(taskConfig);
......
while (!isStopping()) {
......
toSend = poll();
if (toSend == null) continue;
if (!sendRecords())
......
}
} catch (InterruptedException e) {
// Ignore and allow to exit.
} finally {
// It should still be safe to commit offsets since any exception would have
// simply resulted in not getting more records but all the existing records should be ok to flush
// and commit offsets. Worst case, task.flush() will also throw an exception causing the offset commit
// to fail.
commitOffsets();
}
}
public boolean commitOffsets() {
......
commitSourceTask();
......
}
private void commitSourceTask() {
try {
this.task.commit();
} catch (Throwable t) {
log.error("{} Exception thrown while calling task.commit()", this, t);
}
}
可以看到逻辑最终回到了Kafka Connect的主流程代码里,大致意思是将待发送的记录(toSend)发送到kafka成功之后,才会执行commitOffsets方法,该方法中再执行Debezium的逻辑,即向pg发送应答。因此,结论很明显了,如果一直没有待发送的记录,也就是你同步的表没有更新,commitOffsets永远不会执行,Debezium也永远不会向pg发送应答。与此同时,其他表有更新,lsn一直向前,这就是为什么2个slot的lsn差距越拉越大的原因。
有同事采用如下方法解决:
新建一张表,也配在Debezium同步范围之内,并定期写入记录。
运行一段时间后与pg连接中断
运行一段时间的Debezium经常出现这个异常,有时重启connector后恢复正常,有时死活不行:
org.apache.kafka.connect.errors.ConnectException: An exception ocurred in the change event producer. This connector will be stopped.
at io.debezium.connector.base.ChangeEventQueue.throwProducerFailureIfPresent(ChangeEventQueue.java:170)
at io.debezium.connector.base.ChangeEventQueue.poll(ChangeEventQueue.java:151)
at io.debezium.connector.postgresql.PostgresConnectorTask.poll(PostgresConnectorTask.java:161)
at org.apache.kafka.connect.runtime.WorkerSourceTask.poll(WorkerSourceTask.java:244)
at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:220)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.postgresql.util.PSQLException: Database connection failed when writing to copy
at org.postgresql.core.v3.QueryExecutorImpl.flushCopy(QueryExecutorImpl.java:1015)
at org.postgresql.core.v3.CopyDualImpl.flushCopy(CopyDualImpl.java:23)
at org.postgresql.core.v3.replication.V3PGReplicationStream.updateStatusInternal(V3PGReplicationStream.java:190)
at org.postgresql.core.v3.replication.V3PGReplicationStream.timeUpdateStatus(V3PGReplicationStream.java:181)
at org.postgresql.core.v3.replication.V3PGReplicationStream.readInternal(V3PGReplicationStream.java:121)
at org.postgresql.core.v3.replication.V3PGReplicationStream.read(V3PGReplicationStream.java:70)
at io.debezium.connector.postgresql.connection.PostgresReplicationConnection$1.read(PostgresReplicationConnection.java:266)
at io.debezium.connector.postgresql.RecordsStreamProducer.streamChanges(RecordsStreamProducer.java:140)
at io.debezium.connector.postgresql.RecordsStreamProducer.lambda$start$6(RecordsStreamProducer.java:123)
... 5 more
Caused by: java.net.SocketException: Connection reset
at java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:115)
at java.net.SocketOutputStream.write(SocketOutputStream.java:155)
at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82)
at java.io.BufferedOutputStream.flush(BufferedOutputStream.java:140)
at org.postgresql.core.PGStream.flush(PGStream.java:527)
at org.postgresql.core.v3.QueryExecutorImpl.flushCopy(QueryExecutorImpl.java:1012)
... 13 more
减少此类问题出现的方法是适当调大pg的wal_sender_timeout参数,我们的pg默认是1分钟。
postgres=# select * from pg_settings where name like 'wal%timeout';
name | setting | unit | category | short_desc | extra_desc | context | vartype |
source | min_val | max_val | enumvals | boot_val | reset_val | sourcefile | sourceline | pending_restart
----------------------+---------+------+-------------------------------+--------------------------------------------------------------+------------+---------+---------+-
--------+---------+------------+----------+----------+-----------+------------+------------+-----------------
wal_receiver_timeout | 60000 | ms | Replication / Standby Servers | Sets the maximum wait time to receive data from the primary. | | sighup | integer |
default | 0 | 2147483647 | | 60000 | 60000 | | | f
wal_sender_timeout | 60000 | ms | Replication / Sending Servers | Sets the maximum time to wait for WAL replication. | | sighup | integer |
default | 0 | 2147483647 | | 60000 | 60000 | | | f
(2 rows)




