一、环境
macOS catalina 10.15.4
kafka 2.4.1
zookeeper 3.5.7
二、安装
mac下安装软件最方便的还是软件包管理器 brew,通过brew 安装,会将其依赖的软件安装好
brew install kafkaUpdating Homebrew...==> Installing dependencies for kafka: zookeeper==> Installing kafka dependency: zookeeper==> Downloading https://mirrors.aliyun.com/homebrew/homebrew-bottles/bottles/zookeeper-3.5.7.catalina.bottle.tar.gzAlready downloaded: Users/samyang/Library/Caches/Homebrew/downloads/5f41691708449ee5d1863e9928ec209d3451b184ab92e631da417c4397f0e87b--zookeeper-3.5.7.catalina.bottle.tar.gz==> Pouring zookeeper-3.5.7.catalina.bottle.tar.gz==> CaveatsTo have launchd start zookeeper now and restart at login:brew services start zookeeperOr, if you don't want/need a background service you can just run:zkServer start==> Summary🍺 usr/local/Cellar/zookeeper/3.5.7: 394 files, 11.3MB==> Installing kafka==> Downloading https://mirrors.aliyun.com/homebrew/homebrew-bottles/bottles/kafka-2.4.1.catalina.bottle.tar.gzAlready downloaded: Users/samyang/Library/Caches/Homebrew/downloads/9f9065f6a56c2981c2e9fdbcf6f959797b52c141216b6d1fa99a9013d558b7f7--kafka-2.4.1.catalina.bottle.tar.gz==> Pouring kafka-2.4.1.catalina.bottle.tar.gz==> CaveatsTo have launchd start kafka now and restart at login:brew services start kafkaOr, if you don't want/need a background service you can just run:zookeeper-server-start usr/local/etc/kafka/zookeeper.properties & kafka-server-start usr/local/etc/kafka/server.properties==> Summary🍺 usr/local/Cellar/kafka/2.4.1: 186 files, 59.5MB==> Caveats==> zookeeperTo have launchd start zookeeper now and restart at login:brew services start zookeeperOr, if you don't want/need a background service you can just run:zkServer start==> kafkaTo have launchd start kafka now and restart at login:brew services start kafkaOr, if you don't want/need a background service you can just run:zookeeper-server-start usr/local/etc/kafka/zookeeper.properties & kafka-server-start usr/local/etc/kafka/server.properties
三、启动
通过brew 方式启动kafka,会自动将zookeeper一起启动
brew services start kafka四、 kafka基本操作
4.1、坑和填坑
在做下面简单实验时会发现,kafka的 broker连不上,报下面错误
[2020-06-04 22:22:26,721] WARN [AdminClient clientId=adminclient-1] Connection to node -1 (localhost/127.0.0.1:9092) could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
查看端口 ,端口不通,说明kafka没有正常运行
telnet localhost 9092Trying ::1...telnet: connect to address ::1: Connection refusedTrying 127.0.0.1...telnet: connect to address 127.0.0.1: Connection refusedtelnet: Unable to connect to remote host
查看kafka日志 发现zookeeper 连接有问题
zookeeper.ClientCnxn) [2020-06-04 22:30:32,337] INFO Socket error occurred: localhost/0:0:0:0:0:0:0:1:2181: Connection refused (org.apache.zookeeper.ClientCnxn)
查看zookeeper日志,发现 如下问题
2020-06-04 22:01:21 NIOServerCnxn [WARN] Unable to read additional data from client sessionid 0x1002d5b13530000, likely client has closed socket2020-06-04 22:33:02 QuorumPeerMain [WARN] Either no config or no quorum defined in config, running in standalone mode2020-06-04 22:33:02 ContextHandler [WARN] o.e.j.s.ServletContextHandler@6adede5{/,null,UNAVAILABLE} contextPath ends with *2020-06-04 22:33:02 ContextHandler [WARN] Empty contextPath2020-06-04 22:33:02 NIOServerCnxnFactory [ERROR] Thread Thread[main,5,main] diedjava.lang.NoSuchMethodError: java.nio.ByteBuffer.clear()Ljava/nio/ByteBuffer;at org.apache.jute.BinaryOutputArchive.stringToByteBuffer(BinaryOutputArchive.java:77)at org.apache.jute.BinaryOutputArchive.writeString(BinaryOutputArchive.java:107)at org.apache.zookeeper.txn.CreateTxn.serialize(CreateTxn.java:77)at org.apache.zookeeper.server.util.SerializeUtils.serializeRequest(SerializeUtils.java:162)at org.apache.zookeeper.server.ZKDatabase.addCommittedProposal(ZKDatabase.java:280)at org.apache.zookeeper.server.ZKDatabase.addCommittedProposal(ZKDatabase.java:258)at org.apache.zookeeper.server.ZKDatabase.access$000(ZKDatabase.java:65)at org.apache.zookeeper.server.ZKDatabase$1.onTxnLoaded(ZKDatabase.java:229)at org.apache.zookeeper.server.persistence.FileTxnSnapLog.fastForwardFromEdits(FileTxnSnapLog.java:293)at org.apache.zookeeper.server.persistence.FileTxnSnapLog.lambda$restore$0(FileTxnSnapLog.java:229)at org.apache.zookeeper.server.persistence.FileTxnSnapLog.restore(FileTxnSnapLog.java:253)at org.apache.zookeeper.server.ZKDatabase.loadDataBase(ZKDatabase.java:240)at org.apache.zookeeper.server.ZooKeeperServer.loadData(ZooKeeperServer.java:290)at org.apache.zookeeper.server.ZooKeeperServer.startdata(ZooKeeperServer.java:450)at org.apache.zookeeper.server.NIOServerCnxnFactory.startup(NIOServerCnxnFactory.java:764)at org.apache.zookeeper.server.ServerCnxnFactory.startup(ServerCnxnFactory.java:98)at org.apache.zookeeper.server.ZooKeeperServerMain.runFromConfig(ZooKeeperServerMain.java:144)at org.apache.zookeeper.server.ZooKeeperServerMain.initializeAndRun(ZooKeeperServerMain.java:106)at org.apache.zookeeper.server.ZooKeeperServerMain.main(ZooKeeperServerMain.java:64)at org.apache.zookeeper.server.quorum.QuorumPeerMain.initializeAndRun(QuorumPeerMain.java:128)at org.apache.zookeeper.server.quorum.QuorumPeerMain.main(QuorumPeerMain.java:82)2020-06-04 22:33:03 NIOServerCnxnFactory [ERROR] Thread Thread[NIOWorkerThread-1,5,main] died
这种问题一般是jar包的方法有问题导致的 ,更深层次需要debug 去查看,此处我们查看zookeeper的jar包,发现 和zookeeper的版本不匹配。zookeeper是3.5.7,而jar包是 3.5.6,怀疑是这个jar包导致的问题。那么我们替换掉即可。网上没有找到3.5.7的 ,用3.5.8的进行了替换

zookeeper 下载地址
https://mirrors.tuna.tsinghua.edu.cn/apache/zookeeper/zookeeper-3.5.8/apache-zookeeper-3.5.8-bin.tar.gz
删除 zookeeper-3.5.6-SNAPSHOT.jar ,将下载下来的 zookeeper-3.5.8.jar 和 zookeeper-jute-3.5.8.jar 放进去
4.2 列出当前topics
kafka-topics --bootstrap-server=localhost:9092 --list
4.3 查看topic消息内容
kafka-console-consumer --bootstrap-server=hdp002:9092 --topic alp_order --from-beginning
4.4 kafka-python的API简单介绍
这是python 操作kafka的一个模块,具体介绍参考 https://github.com/dpkp/kafka-python
生产者
from kafka import KafkaProducerproducer = KafkaProducer(bootstrap_servers=["localhost:9092"])producer.send('test_topic', key=bytes(str('k'), value='value'.encode('utf-8')
消费者
from kafka import KafkaConsumerconsumer = KafkaConsumer("test_topic", bootstrap_servers=["localhost:9092"], auto_offset_reset='latest')for msg in consumer:key = msg.key.decode(encoding="utf-8") #因为接收到的数据时bytes类型,因此需要解码value = msg.value.decode(encoding="utf-8")print("%s-%d-%d key=%s value=%s" % (msg.topic, msg.partition, msg.offset, key, value))
文章转载自三杯酒coO,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。




