暂无图片
TiCDC 同步cananl-json格式数据到kafka并使用Flink消费时的问题 开发者&应用适配 TiCDC
我来答
分享
Y
yyyola
2021-05-25
TiCDC 同步cananl-json格式数据到kafka并使用Flink消费时的问题 开发者&应用适配 TiCDC

【TiDB 版本】V5.0.1

【问题描述】
TiCDC写入Kafka(canal-json),然后使用Flink 基于SQL 创建Source 表消费写入MySQL,消费时报数据格式错误,因为CDC捕获数据到一个Topic,Flink消费时会取所有变换表的数据,请问如何通过TiCDC配置让CDC可以一个Table一个Topic?或者一下错误,如何处理比较和合适?
at org.apache.flink.formats.json.canal.CanalJsonDeserializationSchema.deserialize(CanalJsonDeserializationSchema.java:285)
at org.apache.flink.streaming.connectors.kafka.table.DynamicKafkaDeserializationSchema.deserialize(DynamicKafkaDeserializationSchema.java:113)
at org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:179)
at org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.runFetchLoop(KafkaFetcher.java:142)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:826)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:66)
at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:269)
Caused by: org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonParseException: Illegal character ((CTRL-CHAR, code 16)): only regular white space (\r, \n, \t) is allowed between tokens
at [Source: UNKNOWN; line: 1, column: 2]
at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:2337)
at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.base.ParserMinimalBase._reportError(ParserMinimalBase.java:710)
at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.base.ParserMinimalBase._throwInvalidSpace(ParserMinimalBase.java:688)
at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.json.UTF8StreamJsonParser._skipWSOrEnd(UTF8StreamJsonParser.java:3012)
at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.json.UTF8StreamJsonParser.nextToken(UTF8StreamJsonParser.java:724)
at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper._readTreeAndClose(ObjectMapper.java:4622)
at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper.readTree(ObjectMapper.java:3056)
at org.apache.flink.formats.json.JsonRowDataDeserializationSchema.deserializeToJsonNode(JsonRowDataDeserializationSchema.java:117)
at org.apache.flink.formats.json.canal.CanalJsonDeserializationSchema.deserialize(CanalJsonDeserializationSchema.java:213)
… 7 more


若提问为性能优化、故障排查类问题,请下载脚本运行。终端输出的打印结果,请务必全选并复制粘贴上传。

我来答
添加附件
收藏
分享
问题补充
2条回答
默认
最新
陈一宁

目前 ticdc 不支持 一个 schema 对应一个 topic。如果需要这样处理
建议一个 changefeed 对应一个 schema 再进行 相关的处理

另外 changefeed 过多非常容易引起 TiCDC 的内存资源消耗

如果只是数据同步 建议直接使用 ticdc 的 sink-mysql 模式 直接写入到 下游 mysql 数据库

目前 ticdc 在 5.0.1 上存在一个验证的已知问题。建议等到 6月初使用 5.0.2 版本的 tidb 集群+ticdc

暂无图片 评论
暂无图片 有用 0
陈一宁

ticdc 的 sink 是支持 很多种 message format 的

包括 canal-json maxwell 等等
如果是自定义的 消息类型 需要 按照 标准的开放数据协议来进行 入库程序的开发

暂无图片 评论
暂无图片 有用 0
回答交流
提交
问题信息
请登录之后查看
邀请回答
暂无人订阅该标签,敬请期待~~
暂无图片墨值悬赏