背景
最近在调研Mysql+Canal+Kafka+Greenplum 架构的时候,卡到了kafka接收消息乱码的问题。各种资料各种查,总是在关键的位置没有后续了@_@。最后也是在偶然的情况下把问题解决了。自己做个总结,也为后面遇到同种问题的墨友提供一个解决思路。(^_^)∠※ =>拿走不谢
问题:
我同时也在问答榜上提问过:问答榜地址
在此感谢耐心回答的李宏达墨友

问题复现
- 修改参数文件:
[root@CanalKafka bin]# pwd
/usr/local/canal-1.1.3/deployer/bin
[root@CanalKafka bin]# vim ../conf/canal.properties
#传输格式:json
canal.mq.flatMessage = false
- 重启canal server
[root@CanalKafka bin]# sh restart.sh
CanalKafka: stopping canal 12286 ...
- mysql 插入一条数据
mysql> insert into t1 values(5,'b');
Query OK, 1 row affected (0.00 sec)
- kafka 接收信息
[root@CanalKafka bin]# sh kafka-console-consumer.sh --bootstrap-server CanalKafka:9092 --topic example --from-beginning
*=;
mysql-bin.000003*UTF-80BJP1343
*
mysql-bin.000003*UTF-80BJPH
mysql-bin.000003*UTF-80BpydbJt1P+Xb
_-+_C-+++1{Pb?id (0B4Ri++(11)
+a+e (0BbR
+a_cha_(100);
- canal.mq.flatMessage 参数修改为true
[root@CanalKafka bin]# vim ../conf/canal.properties
#传输格式:json
canal.mq.flatMessage = true
- 重启canal server
[root@CanalKafka bin]# sh restart.sh
CanalKafka: stopping canal 12286 ...
- mysql 插入再插入一条数据
mysql> insert into t1 values(6,'b');
Query OK, 1 row affected (0.00 sec)
- kafka 接收信息
[root@CanalKafka bin]# sh kafka-console-consumer.sh --bootstrap-server CanalKafka:9092 --topic example --from-beginning

依然是乱码,我卡到的就是这个位置。怎么查资料都说是修改:canal.mq.flatMessage = true 就能解决。但我修改了也重启canal 服务了,但还是乱码,一个头两个大(×_×)。实现没办法了,于是我把zookeeper 及kafka 日志及消息数据全部清除,重新试了一把。
解决
- 停掉zookeeper
[root@CanalKafka bin]# ps -ef | grep zookeeper-
root 9571 1 0 10月28 ? 00:04:40 /bin/java -server -Xms2048m -Xmx3072m -Xmn1024m -XX:SurvivorRatio=2 -XX:PermSize=96m
....略
root 12379 1 1 16:13 pts/0 00:00:26 /bin/java -server -Xms2048m -Xmx3072m -Xmn1024m -XX:SurvivorRatio=2 -XX:PermSize=96m
...略
[root@CanalKafka bin]# kill -9 9571
[root@CanalKafka bin]# kill -9 12379
[root@CanalKafka bin]# ps -ef | grep zookeeper
root 12610 12474 0 16:46 pts/1 00:00:00 grep --color=auto zookeeper
-删除zookeeper数据
# 查看目录:
[root@CanalKafka config]# vim zookeeper.properties
dataDir=/usr/local/kafka_2.13-2.6.0/zookeeper_data
dataLogDir=/usr/local/kafka_2.13-2.6.0/zookeeper_logs
[root@CanalKafka kafka_2.13-2.6.0]# pwd
/usr/local/kafka_2.13-2.6.0
[root@CanalKafka kafka_2.13-2.6.0]# cd zookeeper_data/
[root@CanalKafka zookeeper_data]# rm -rf *
[root@CanalKafka kafka_2.13-2.6.0]# cd zookeeper_logs/
[root@CanalKafka zookeeper_data]# rm -rf *
- 重启zookeeper服务
[root@CanalKafka bin]# sh zookeeper-server-start.sh -daemon ../config/zookeeper.properties
[root@CanalKafka bin]# ps -ef | grep zookeeper
root 12967 1 8 16:50 pts/1 00:00:01 java -Xmx512M -Xms512M -server -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHe
省略......
- 清理Kafka日志及消息并启动kafka, 因为kafka 是依赖 zookeeper 服务的,zookeeper 关闭后,kafka 会自己宕掉的。
-- 清理日志及消息
[root@CanalKafka kafka-logs]# pwd
/usr/local/kafka_2.13-2.6.0/kafka-logs
[root@CanalKafka kafka-logs]#rm -rf *
[root@CanalKafka logs]# pwd
/usr/local/kafka_2.13-2.6.0/logs
[root@CanalKafka logs]# rm -rf *
[root@CanalKafka bin]# sh kafka-server-start.sh -daemon ../config/server.properties
[root@CanalKafka bin]# ps -ef | grep kafka
root 12967 1 0 16:50 pts/1 00:00:01 java -Xmx512M -Xms512M -server -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHe
省略......
- mysql 再再插入一条数据
mysql> insert into t1 values(7,'b');
Query OK, 1 row affected (0.01 sec)
- 查看kafka 消息
[root@CanalKafka bin]# sh kafka-console-consumer.sh --bootstrap-server CanalKafka:9092 --topic example --from-beginning
{"data":[{"id":"7","name":"b"}],"database":"pydb","es":1667037519000,"id":2,"isDdl":false,"mysqlType":{"id":"int(11)","name":"varchar(100)"},"old":null,"pkNames":["id"],"sql":"","sqlType":{"id":4,"name":12},"table":"t1","ts":1667037503133,"type":"INSERT"}
谈谈感受:
1、Canal 服务的canal.mq.flatMessage = false/true 参数重置后,重启Canal服务并没有解决后续消息乱码问题。所以清空zookeeper与kafka消息后得以解决。
这里有个小猜测:可能是前面的消息是乱码格式,导致后进来的正常数据被影响到了。
本人刚接触canal+kafka框架,后续有新的发现再跟进同步。
2、声明:本环境为调研测试环境,所以解决问题比较粗暴,仅提供解决思路。生产环境谨慎操作。
文章推荐
《Oracle_索引重建—优化索引碎片》
《Oracle 自动收集统计信息机制》
《Oracle 脚本实现简单的审计功能》
《oracle 监控表空间脚本 每月10号0点至06点不报警》
《DBA_TAB_MODIFICATIONS表的刷新策略测试》
《FY_Recover_Data.dbf》
《Oracle RAC 集群迁移文件操作.pdf》
《Oracle Date 字段索引使用测试.dbf》
《Oracle 诊断案例 :因应用死循环导致的CPU过高》
《Oracle 慢SQL监控脚本》
《Oracle 慢SQL监控测试及监控脚本.pdf》
《记录一起索引rebuild与收集统计信息的事故》
《RAC DG删除备库redo时报ORA-01623》
《ASH报告发现:os thread startup 等待事件分析》
《问答榜上引发的Oracle并行的探究(一)》
《问答榜上引发的Oracle并行的探究(二)》
欢迎赞赏支持或留言指正




