暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

Debezium常见问题

洞见大数据 2021-06-10
5039

对Oracle的支持

Debezium支持以Oracle为源进行抽取。抽取原理为:以Oracle提供的XStream API,从日志中捕获变更事件。但是XStream API需要Oracle GoldenGate产品的License,因此在没有购买GoldenGate License的场景下使用存在法律风险,不推荐。
另外一点,Debezium的主要代码托管在https://github.com/debezium/debezium,而Oracle, Cassandra等connector代码还在“孵化”中,位于https://github.com/debezium/debezium-incubator

Kafka Connect rebalance问题

这个问题非常普遍,在Kafka Connect2.3版本出来之前,几乎所有Debezium用户都有碰到。
Kafka Connect运行在distribute模式,在2.3版本之前,每当有connector新增、修改或删除后,就会触发集群的rebalance,然后集群内部所有connector停止并重启。但由于Debezium connector重启是很重的操作,导致重启有可能造成集群内多个connector挂掉。2.3版本之后,Kafka Connect采用了incremental cooperative rebalancing design,在reblance期间不会停止并重启所有connector,详见:http://kafka.apache.org/documentation/#upgrade_230_notable
因此建议升级Kafka Connect到2.3版本。

restart_lsn一直不动

Debezium运行几天后,突然源库pg磁盘告警。排查后发现pg某些复制槽的lsn一直不动,导致pg积压太多wal日志无法清理。
如下图所示,观察pg复制槽的状态:

postgres=# select * from pg_replication_slots;
    slot_name    |  plugin  | slot_type | datoid | database | active | active_pid | xmin | catalog_xmin | restart_lsn | confirmed_flush_lsn 
-----------------+----------+-----------+--------+----------+--------+------------+------+--------------+-------------+---------------------
 ***36h2x1****** | wal2json | logical   |  33032 | *******  | t      |      64259 |      |      1479674 | 7/6D592C38  | 7/6D592C38
 ***_***ect2     | wal2json | logical   |  16390 | aaa      | f      |            |      |      1809194 | 8/51637810  | 8/51637810
 *****           | wal2json | logical   |  33032 | *******  | t      |      10457 |      |      1834975 | 8/565E7CD8  | 8/5917F550
(8 rows)

所有slot都是Debezium创建的,连接的都是同一台pg实例,其中第1个与第3个slot均为active状态,但restart_lsn和confirmed_flush_lsn却相差甚远。
这是为什么?难不成slot3的处理速度跟不上slot1?对Kafka Connect进程断点调试之后发现这两个slot都能收到最新的wal日志,因此排除了这种可能。
那就只能从源码着手,Debezium在收到pg的wal日志之后,要向pg进行应答,我们就要找出源码里何处进行应答:

io.debezium.connector.postgresql.connection.PostgresReplicationConnection

@Override
public void flushLsn(long lsn) throws SQLException {
    doFlushLsn(LogSequenceNumber.valueOf(lsn));
}

private void doFlushLsn(LogSequenceNumber lsn) throws SQLException {
    stream.setFlushedLSN(lsn);
    stream.setAppliedLSN(lsn);

    stream.forceUpdateStatus();
}

io.debezium.connector.postgresql.PostgresStreamingChangeEventSource

@Override
public void commitOffset(Map<String, ?> offset) {
    try {
        ReplicationStream replicationStream = this.replicationStream.get();
        final Long lsn = (Long) offset.get(PostgresOffsetContext.LAST_COMPLETELY_PROCESSED_LSN_KEY);
        if (replicationStream != null && lsn != null) {
            if (LOGGER.isDebugEnabled()) {
                LOGGER.debug("Flushing LSN to server: {}", LogSequenceNumber.valueOf(lsn));
            }
            // tell the server the point up to which we've processed data, so it can be free to recycle WAL segments
            replicationStream.flushLsn(lsn);
        }
        else {
            LOGGER.debug("Streaming has already stopped, ignoring commit callback...");
        }
    }
    catch (SQLException e) {
        throw new ConnectException(e);
    }
}

io.debezium.connector.postgresql.PostgresConnectorTask

@Override
public void commit() throws InterruptedException {
    if (coordinator != null) {
        coordinator.commitOffset(lastOffset);
    }
}

org.apache.kafka.connect.runtime.WorkerSourceTask

@Override
public void execute() {
    try {
        task.initialize(new WorkerSourceTaskContext(offsetReader, this, configState));
        task.start(taskConfig);
        ......
        while (!isStopping()) {
            ......
            toSend = poll();
            if (toSend == nullcontinue;
            if (!sendRecords())
                ......
        }
    } catch (InterruptedException e) {
        // Ignore and allow to exit.
    } finally {
        // It should still be safe to commit offsets since any exception would have
        // simply resulted in not getting more records but all the existing records should be ok to flush
        // and commit offsets. Worst case, task.flush() will also throw an exception causing the offset commit
        // to fail.
        commitOffsets();
    }
}

public boolean commitOffsets() {
    ......
    commitSourceTask();
    ......
}

private void commitSourceTask() {
    try {
        this.task.commit();
    } catch (Throwable t) {
        log.error("{} Exception thrown while calling task.commit()"this, t);
    }
}

可以看到逻辑最终回到了Kafka Connect的主流程代码里,大致意思是将待发送的记录(toSend)发送到kafka成功之后,才会执行commitOffsets方法,该方法中再执行Debezium的逻辑,即向pg发送应答。因此,结论很明显了,如果一直没有待发送的记录,也就是你同步的表没有更新,commitOffsets永远不会执行,Debezium也永远不会向pg发送应答。与此同时,其他表有更新,lsn一直向前,这就是为什么2个slot的lsn差距越拉越大的原因。
有同事采用如下方法解决:
新建一张表,也配在Debezium同步范围之内,并定期写入记录。

运行一段时间后与pg连接中断

运行一段时间的Debezium经常出现这个异常,有时重启connector后恢复正常,有时死活不行:

org.apache.kafka.connect.errors.ConnectException: An exception ocurred in the change event producer. This connector will be stopped.
at io.debezium.connector.base.ChangeEventQueue.throwProducerFailureIfPresent(ChangeEventQueue.java:170)
at io.debezium.connector.base.ChangeEventQueue.poll(ChangeEventQueue.java:151)
at io.debezium.connector.postgresql.PostgresConnectorTask.poll(PostgresConnectorTask.java:161)
at org.apache.kafka.connect.runtime.WorkerSourceTask.poll(WorkerSourceTask.java:244)
at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:220)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.postgresql.util.PSQLException: Database connection failed when writing to copy
at org.postgresql.core.v3.QueryExecutorImpl.flushCopy(QueryExecutorImpl.java:1015)
at org.postgresql.core.v3.CopyDualImpl.flushCopy(CopyDualImpl.java:23)
at org.postgresql.core.v3.replication.V3PGReplicationStream.updateStatusInternal(V3PGReplicationStream.java:190)
at org.postgresql.core.v3.replication.V3PGReplicationStream.timeUpdateStatus(V3PGReplicationStream.java:181)
at org.postgresql.core.v3.replication.V3PGReplicationStream.readInternal(V3PGReplicationStream.java:121)
at org.postgresql.core.v3.replication.V3PGReplicationStream.read(V3PGReplicationStream.java:70)
at io.debezium.connector.postgresql.connection.PostgresReplicationConnection$1.read(PostgresReplicationConnection.java:266)
at io.debezium.connector.postgresql.RecordsStreamProducer.streamChanges(RecordsStreamProducer.java:140)
at io.debezium.connector.postgresql.RecordsStreamProducer.lambda$start$6(RecordsStreamProducer.java:123)
... 5 more
Caused by: java.net.SocketException: Connection reset
at java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:115)
at java.net.SocketOutputStream.write(SocketOutputStream.java:155)
at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82)
at java.io.BufferedOutputStream.flush(BufferedOutputStream.java:140)
at org.postgresql.core.PGStream.flush(PGStream.java:527)
at org.postgresql.core.v3.QueryExecutorImpl.flushCopy(QueryExecutorImpl.java:1012)
... 13 more

减少此类问题出现的方法是适当调大pg的wal_sender_timeout参数,我们的pg默认是1分钟。

postgres=# select * from pg_settings where name like 'wal%timeout';
         name         | setting | unit |           category            |                          short_desc                          | extra_desc | context | vartype | 
source  | min_val |  max_val   | enumvals | boot_val | reset_val | sourcefile | sourceline | pending_restart 
----------------------+---------+------+-------------------------------+--------------------------------------------------------------+------------+---------+---------+-
--------+---------+------------+----------+----------+-----------+------------+------------+-----------------
 wal_receiver_timeout | 60000   | ms   | Replication / Standby Servers | Sets the maximum wait time to receive data from the primary. |            | sighup  | integer | 
default | 0       | 2147483647 |          | 60000    | 60000     |            |            | f
 wal_sender_timeout   | 60000   | ms   | Replication / Sending Servers | Sets the maximum time to wait for WAL replication.           |            | sighup  | integer | 
default | 0       | 2147483647 |          | 60000    | 60000     |            |            | f
(2 rows)

文章转载自洞见大数据,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论