暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

记录 Flink jdbc、mysql-cdc 连接 mysql8 碰到的适配问题

伦少的博客 2025-04-30
111

 

前言

记录 Flink jdbc、mysql-cdc 连接 mysql8 碰到的小问题

版本

  • • Flink 1.15.3
  • • mysql-cdc 2.3.0
  • • MySQL 8.0.27

cdc_mysql2mysql

MySQL5

之前主要用 MySQL5 ,下面是 MySQL5 的 sql ,具体见 Flink MySQL CDC 使用总结

set yarn.application.name=cdc_mysql2mysql;
set execution.target=yarn-per-job;
set parallelism.default=1;
set taskmanager.memory.process.size=3g;

set execution.checkpointing.interval=10000
set state.checkpoints.dir=hdfs:///flink/checkpoints/cdc_mysql2mysql;
set execution.checkpointing.externalized-checkpoint-retention= RETAIN_ON_CANCELLATION;

CREATE TABLE mysql_cdc_source (
  id int PRIMARY KEYNOT ENFORCED, --主键必填,且要求源表必须有主键,flink主键可以和mysql主键不一致
  name string,
  price double,
  ts bigint,
  dt string
WITH (
    'connector'='mysql-cdc',
    'hostname'='19.168.44.128',
    'port'='3306',
    'username'='root',
    'password'='root-123',
    'database-name'='cdc',
    'table-name'='mysql_cdc_source'
);

create table test_sink_mysql (
  id int PRIMARY KEYNOT ENFORCED,
  name string,
  price double,
  ts bigint,
  dt string
with (
'connector'='jdbc',
'url'='jdbc:mysql://19.168.44.128:3306/cdc?useSSL=false&useUnicode=true&characterEncoding=UTF-8&characterSetResults=UTF-8&rewriteBatchedStatements=true',
'username'='root',
'password'='root-123',
'table-name'='test_sink_mysql',
'sink.buffer-flush.max-rows'='1000000'
);

insert into test_sink_mysql(id,name,price,ts,dt) select from mysql_cdc_source;

MySQL8

同样的 SQL 会报错:

Caused by: java.lang.ClassCastException: java.math.BigInteger cannot be cast to java.lang.Long
    at com.mysql.jdbc.ConnectionImpl.buildCollationMapping(ConnectionImpl.java:1024)

最初怀疑是该版本的 cdc 不支持 MySQL8,后来发现只需要在 jdbc 添加 driver 参数解决:

'driver' = 'com.mysql.cj.jdbc.Driver'

完整的sql:

set yarn.application.name=cdc_mysql2mysql;
set execution.target=yarn-per-job;
set parallelism.default=1;
set taskmanager.memory.process.size=3g;

set execution.checkpointing.interval=10000
set state.checkpoints.dir=hdfs:///flink/checkpoints/cdc_mysql2mysql;
set execution.checkpointing.externalized-checkpoint-retention= RETAIN_ON_CANCELLATION;

CREATE TABLE mysql_cdc_source (
  id int PRIMARY KEYNOT ENFORCED, --主键必填,且要求源表必须有主键,flink主键可以和mysql主键不一致
  name string,
  price double,
  ts bigint,
  dt string
WITH (
    'connector'='mysql-cdc',
    'hostname'='19.168.44.128',
    'port'='3306',
    'username'='root',
    'password'='root-123',
    'database-name'='cdc',
    'table-name'='mysql_cdc_source'
);

create table test_sink_mysql (
  id intPRIMARY KEYNOT ENFORCED,
  name string,
  price double,
  ts bigint,
  dt string
with (
'connector'='jdbc',
'url'='jdbc:mysql://19.168.44.128:3306/cdc?useSSL=false&useUnicode=true&characterEncoding=UTF-8&characterSetResults=UTF-8&rewriteBatchedStatements=true',
'driver'='com.mysql.cj.jdbc.Driver',
'username'='root',
'password'='root-123',
'table-name'='test_sink_mysql',
'sink.buffer-flush.max-rows'='1000000'
);

insert into test_sink_mysql(id,name,price,ts,dt) select from mysql_cdc_source;

小结:

  • • mysql-cdc: MySQL5和8的写法一样
  • • jdbc: MySQL8 要添加 dirver 参数:'driver' = 'com.mysql.cj.jdbc.Driver'

jdbc_mysql2mysql

MySQL8

根据上面 cdc_mysql2mysql 的经验,jdbc_mysql2mysql source 和 sink 应该都添加driver:

set yarn.application.name=jdbc_mysql2mysql;
set execution.target=yarn-per-job;

create table mysql_cdc_source (
  id int PRIMARY KEYNOT ENFORCED,
  name string,
  price double,
  ts bigint,
  dt string
with (
'connector'='jdbc',
'url'='jdbc:mysql://192.168.44.128:3306/cdc?useSSL=false&useUnicode=true&characterEncoding=UTF-8&characterSetResults=UTF-8&rewriteBatchedStatements=true',
'driver'='com.mysql.cj.jdbc.Driver',
'username'='root',
'password'='root-123',
'table-name'='mysql_cdc_source'
);


create table test_sink_mysql (
  id int PRIMARY KEYNOT ENFORCED,
  name string,
  price double,
  ts bigint,
  dt string
with (
'connector'='jdbc',
'url'='jdbc:mysql://192.168.44.128:3306/cdc?useSSL=false&useUnicode=true&characterEncoding=UTF-8&characterSetResults=UTF-8&rewriteBatchedStatements=true',
'driver'='com.mysql.cj.jdbc.Driver',
'username'='root',
'password'='root-123',
'table-name'='test_sink_mysql',
'sink.buffer-flush.max-rows'='1000000'
);

insert into test_sink_mysql(id,name,price,ts,dt) select from mysql_cdc_source;

本来以为这样就没问题了,但是会报错:

Caused by: java.lang.ClassCastException: java.lang.Integer cannot be cast to java.lang.Long

经排查发现是字段类型不一致导致的问题,因为 mysql 建表时 ts的类型为 int ,那么在flink sql 中 ts也应该为 int 而不应该为 bigint,完整的正确 sql 为:

set yarn.application.name=jdbc_mysql2mysql;
set execution.target=yarn-per-job;

create table mysql_cdc_source (
  id int PRIMARY KEYNOT ENFORCED,
  name string,
  price double,
  ts int,
  dt string
with (
'connector'='jdbc',
'url'='jdbc:mysql://192.168.44.128:3306/cdc?useSSL=false&useUnicode=true&characterEncoding=UTF-8&characterSetResults=UTF-8&rewriteBatchedStatements=true',
'driver'='com.mysql.cj.jdbc.Driver',
'username'='root',
'password'='root-123',
'table-name'='mysql_cdc_source'
);


create table test_sink_mysql (
  id int PRIMARY KEYNOT ENFORCED,
  name string,
  price double,
  ts int,
  dt string
with (
'connector'='jdbc',
'url'='jdbc:mysql://192.168.44.128:3306/cdc?useSSL=false&useUnicode=true&characterEncoding=UTF-8&characterSetResults=UTF-8&rewriteBatchedStatements=true',
'driver'='com.mysql.cj.jdbc.Driver',
'username'='root',
'password'='root-123',
'table-name'='test_sink_mysql',
'sink.buffer-flush.max-rows'='1000000'
);

insert into test_sink_mysql(id,name,price,ts,dt) select from mysql_cdc_source;

MySQL5

验证一下 MySQL5 是不是存在和 MySQL8 一样的问题,经验证问题一样,在 MySQL5 中 ts 的类型为bigint 也会报同样的错误,完整的正确 sql 为:

set yarn.application.name=jdbc_mysql2mysql;
set execution.target=yarn-per-job;

create table mysql_cdc_source (
  id int PRIMARY KEYNOT ENFORCED,
  name string,
  price double,
  ts int,
  dt string
with (
'connector'='jdbc',
'url'='jdbc:mysql://192.168.44.128:3306/cdc?useSSL=false&useUnicode=true&characterEncoding=UTF-8&characterSetResults=UTF-8&rewriteBatchedStatements=true',
'username'='root',
'password'='root-123',
'table-name'='mysql_cdc_source'
);


create table test_sink_mysql (
  id int PRIMARY KEYNOT ENFORCED,
  name string,
  price double,
  ts int,
  dt string
with (
'connector'='jdbc',
'url'='jdbc:mysql://192.168.44.128:3306/cdc?useSSL=false&useUnicode=true&characterEncoding=UTF-8&characterSetResults=UTF-8&rewriteBatchedStatements=true',
'username'='root',
'password'='root-123',
'table-name'='test_sink_mysql',
'sink.buffer-flush.max-rows'='1000000'
);

insert into test_sink_mysql(id,name,price,ts,dt) select from mysql_cdc_source;

sink

在 cdc_mysql2mysql 中 ts bigint 就没问题, 尝试把 sink 表中 ts 的字段类型改为 bigint,最终发现 MySQL5 和 MySQL8 都没问题,也就是只有 jdbc 的 source 表对字段类型限制比较严格。

 

driver 参数

  • • jdbc:MySQL5 添加 driver 参数也可以正常运行,但不是必须的,MySQL8 必须添加 driver 参数,所以无论是 5 还是 8 都加上 driver 参数,这样就不用区分 mysql的版本了。
  • • cdc : 不支持 driver 参数

 

字段类型映射

官方文档:

  • • jdbc: https://nightlies.apache.org/flink/flink-docs-release-2.0/zh/docs/connectors/table/jdbc/#%E6%95%B0%E6%8D%AE%E7%B1%BB%E5%9E%8B%E6%98%A0%E5%B0%84
  • • cdc : https://nightlies.apache.org/flink/flink-cdc-docs-release-3.3/zh/docs/connectors/flink-sources/mysql-cdc/#%E6%95%B0%E6%8D%AE%E7%B1%BB%E5%9E%8B%E6%98%A0%E5%B0%84

MySQL8 适配

由此可见,Flink jdbc、mysql-cdc 均适配 MySQL5 和 MySQL8,对应 jar 如下:

  • • jdbc: flink-connector-jdbc-1.15.3.jar
  • • cdc : flink-sql-connector-mysql-cdc-2.3.0.jar

仅需要这两个包,不需要额外的 mysql-connector-java jar包

但在 cdc 3.1 版本以上,需要额外的 mysql-connector-java jar包,具体见官网:https://nightlies.apache.org/flink/flink-cdc-docs-release-3.3/zh/docs/connectors/flink-sources/mysql-cdc/

cdc 版本支持

官方文档:https://nightlies.apache.org/flink/flink-cdc-docs-release-3.3/zh/docs/connectors/flink-sources/overview/

 


文章转载自伦少的博客,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论