暂无图片
暂无图片
3
暂无图片
暂无图片
暂无图片

mysql+canal+kafka 接收消息乱码问题

原创 布衣 2022-11-01
1546

背景

  最近在调研Mysql+Canal+Kafka+Greenplum 架构的时候,卡到了kafka接收消息乱码的问题。各种资料各种查,总是在关键的位置没有后续了@_@。最后也是在偶然的情况下把问题解决了。自己做个总结,也为后面遇到同种问题的墨友提供一个解决思路。(^_^)∠※ =>拿走不谢

问题:

  我同时也在问答榜上提问过:问答榜地址
  在此感谢耐心回答的李宏达墨友
image.png

问题复现

  • 修改参数文件:
[root@CanalKafka bin]# pwd
/usr/local/canal-1.1.3/deployer/bin
[root@CanalKafka bin]# vim ../conf/canal.properties 
#传输格式:json
canal.mq.flatMessage = false
  • 重启canal server
[root@CanalKafka bin]# sh restart.sh 
CanalKafka: stopping canal 12286 ... 
  • mysql 插入一条数据
mysql> insert into t1 values(5,'b'); 
Query OK, 1 row affected (0.00 sec)
  • kafka 接收信息
[root@CanalKafka bin]#  sh kafka-console-consumer.sh --bootstrap-server CanalKafka:9092 --topic example --from-beginning   
*=;
mysql-bin.000003*UTF-80BJP1343
*

   mysql-bin.000003*UTF-80BJPH 
mysql-bin.000003*UTF-80BpydbJt1P+Xb
        _-+_C-+++1{Pb?id (0B4Ri++(11)
                                                       +a+e (0BbR
                                                                     +a_cha_(100);
  • canal.mq.flatMessage 参数修改为true
[root@CanalKafka bin]# vim ../conf/canal.properties 
#传输格式:json
canal.mq.flatMessage = true
  • 重启canal server
[root@CanalKafka bin]# sh restart.sh 
CanalKafka: stopping canal 12286 ... 
  • mysql 插入再插入一条数据
mysql> insert into t1 values(6,'b'); 
Query OK, 1 row affected (0.00 sec)
  • kafka 接收信息
[root@CanalKafka bin]#  sh kafka-console-consumer.sh --bootstrap-server CanalKafka:9092 --topic example --from-beginning   

image.png
依然是乱码,我卡到的就是这个位置。怎么查资料都说是修改:canal.mq.flatMessage = true 就能解决。但我修改了也重启canal 服务了,但还是乱码,一个头两个大(×_×)。实现没办法了,于是我把zookeeper 及kafka 日志及消息数据全部清除,重新试了一把。

解决

  • 停掉zookeeper
[root@CanalKafka bin]# ps -ef | grep zookeeper-
root      9571     1  0 10月28 ?      00:04:40 /bin/java -server -Xms2048m -Xmx3072m -Xmn1024m -XX:SurvivorRatio=2 -XX:PermSize=96m 
....略
root     12379     1  1 16:13 pts/0    00:00:26 /bin/java -server -Xms2048m -Xmx3072m -Xmn1024m -XX:SurvivorRatio=2 -XX:PermSize=96m
...略
[root@CanalKafka bin]# kill -9 9571
[root@CanalKafka bin]# kill -9 12379
[root@CanalKafka bin]# ps -ef | grep zookeeper
root     12610 12474  0 16:46 pts/1    00:00:00 grep --color=auto zookeeper

-删除zookeeper数据

# 查看目录:
[root@CanalKafka config]# vim zookeeper.properties 
dataDir=/usr/local/kafka_2.13-2.6.0/zookeeper_data
dataLogDir=/usr/local/kafka_2.13-2.6.0/zookeeper_logs
[root@CanalKafka kafka_2.13-2.6.0]# pwd
/usr/local/kafka_2.13-2.6.0
[root@CanalKafka kafka_2.13-2.6.0]# cd zookeeper_data/
[root@CanalKafka zookeeper_data]# rm -rf *
[root@CanalKafka kafka_2.13-2.6.0]# cd zookeeper_logs/
[root@CanalKafka zookeeper_data]# rm -rf *
  • 重启zookeeper服务
[root@CanalKafka bin]# sh zookeeper-server-start.sh -daemon ../config/zookeeper.properties 
[root@CanalKafka bin]# ps -ef | grep zookeeper
root     12967     1  8 16:50 pts/1    00:00:01 java -Xmx512M -Xms512M -server -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHe
省略......
  • 清理Kafka日志及消息并启动kafka, 因为kafka 是依赖 zookeeper 服务的,zookeeper 关闭后,kafka 会自己宕掉的。
-- 清理日志及消息
[root@CanalKafka kafka-logs]# pwd
/usr/local/kafka_2.13-2.6.0/kafka-logs
[root@CanalKafka kafka-logs]#rm -rf *
[root@CanalKafka logs]# pwd
/usr/local/kafka_2.13-2.6.0/logs
[root@CanalKafka logs]# rm -rf *
[root@CanalKafka bin]# sh kafka-server-start.sh -daemon ../config/server.properties 
[root@CanalKafka bin]# ps -ef | grep kafka
root     12967     1  0 16:50 pts/1    00:00:01 java -Xmx512M -Xms512M -server -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHe
省略......
  • mysql 再再插入一条数据
mysql> insert into t1 values(7,'b'); 
Query OK, 1 row affected (0.01 sec)
  • 查看kafka 消息
[root@CanalKafka bin]#  sh kafka-console-consumer.sh --bootstrap-server CanalKafka:9092 --topic example --from-beginning  
{"data":[{"id":"7","name":"b"}],"database":"pydb","es":1667037519000,"id":2,"isDdl":false,"mysqlType":{"id":"int(11)","name":"varchar(100)"},"old":null,"pkNames":["id"],"sql":"","sqlType":{"id":4,"name":12},"table":"t1","ts":1667037503133,"type":"INSERT"}

谈谈感受:

1、Canal 服务的canal.mq.flatMessage = false/true 参数重置后,重启Canal服务并没有解决后续消息乱码问题。所以清空zookeeper与kafka消息后得以解决。
 这里有个小猜测:可能是前面的消息是乱码格式,导致后进来的正常数据被影响到了。
 本人刚接触canal+kafka框架,后续有新的发现再跟进同步。
2、声明:本环境为调研测试环境,所以解决问题比较粗暴,仅提供解决思路。生产环境谨慎操作。

欢迎赞赏支持或留言指正

「喜欢这篇文章,您的关注和赞赏是给作者最好的鼓励」
关注作者
【版权声明】本文为墨天轮用户原创内容,转载时必须标注文章的来源(墨天轮),文章链接,文章作者等基本信息,否则作者和墨天轮有权追究责任。如果您发现墨天轮中有涉嫌抄袭或者侵权的内容,欢迎发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论