暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

MacOS 安装kafka及填坑

三杯酒coO 2020-06-04
2968

一、环境


  • macOS catalina 10.15.4

  • kafka 2.4.1

  • zookeeper 3.5.7



二、安装


mac下安装软件最方便的还是软件包管理器 brew,通过brew 安装,会将其依赖的软件安装好


    brew install kafka
    Updating Homebrew...
    ==> Installing dependencies for kafka: zookeeper
    ==> Installing kafka dependency: zookeeper
    ==> Downloading https://mirrors.aliyun.com/homebrew/homebrew-bottles/bottles/zookeeper-3.5.7.catalina.bottle.tar.gz
    Already downloaded: Users/samyang/Library/Caches/Homebrew/downloads/5f41691708449ee5d1863e9928ec209d3451b184ab92e631da417c4397f0e87b--zookeeper-3.5.7.catalina.bottle.tar.gz
    ==> Pouring zookeeper-3.5.7.catalina.bottle.tar.gz
    ==> Caveats
    To have launchd start zookeeper now and restart at login:
    brew services start zookeeper
    Or, if you don't want/need a background service you can just run:
    zkServer start
    ==> Summary
    🍺 usr/local/Cellar/zookeeper/3.5.7: 394 files, 11.3MB
    ==> Installing kafka
    ==> Downloading https://mirrors.aliyun.com/homebrew/homebrew-bottles/bottles/kafka-2.4.1.catalina.bottle.tar.gz
    Already downloaded: Users/samyang/Library/Caches/Homebrew/downloads/9f9065f6a56c2981c2e9fdbcf6f959797b52c141216b6d1fa99a9013d558b7f7--kafka-2.4.1.catalina.bottle.tar.gz
    ==> Pouring kafka-2.4.1.catalina.bottle.tar.gz
    ==> Caveats
    To have launchd start kafka now and restart at login:
    brew services start kafka
    Or, if you don't want/need a background service you can just run:
    zookeeper-server-start usr/local/etc/kafka/zookeeper.properties & kafka-server-start usr/local/etc/kafka/server.properties
    ==> Summary
    🍺 usr/local/Cellar/kafka/2.4.1: 186 files, 59.5MB
    ==> Caveats
    ==> zookeeper
    To have launchd start zookeeper now and restart at login:
    brew services start zookeeper
    Or, if you don't want/need a background service you can just run:
    zkServer start
    ==> kafka
    To have launchd start kafka now and restart at login:
    brew services start kafka
    Or, if you don't want/need a background service you can just run:
    zookeeper-server-start usr/local/etc/kafka/zookeeper.properties & kafka-server-start usr/local/etc/kafka/server.properties



    三、启动


    通过brew 方式启动kafka,会自动将zookeeper一起启动


    brew services start kafka



    四、 kafka基本操作



    4.1、坑和填坑


    1. 在做下面简单实验时会发现,kafka的 broker连不上,报下面错误

      • [2020-06-04 22:22:26,721] WARN [AdminClient clientId=adminclient-1] Connection to node -1 (localhost/127.0.0.1:9092) could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
      1. 查看端口 ,端口不通,说明kafka没有正常运行

        • telnet localhost 9092
          Trying ::1...
          telnet: connect to address ::1: Connection refused
          Trying 127.0.0.1...
          telnet: connect to address 127.0.0.1: Connection refused
          telnet: Unable to connect to remote host
        1. 查看kafka日志 发现zookeeper 连接有问题

          • zookeeper.ClientCnxn) [2020-06-04 22:30:32,337] INFO Socket error occurred: localhost/0:0:0:0:0:0:0:1:2181: Connection refused (org.apache.zookeeper.ClientCnxn)
          1. 查看zookeeper日志,发现 如下问题

            • 2020-06-04 22:01:21 NIOServerCnxn [WARN] Unable to read additional data from client sessionid 0x1002d5b13530000, likely client has closed socket
              2020-06-04 22:33:02 QuorumPeerMain [WARN] Either no config or no quorum defined in config, running in standalone mode
              2020-06-04 22:33:02 ContextHandler [WARN] o.e.j.s.ServletContextHandler@6adede5{/,null,UNAVAILABLE} contextPath ends with *
              2020-06-04 22:33:02 ContextHandler [WARN] Empty contextPath
              2020-06-04 22:33:02 NIOServerCnxnFactory [ERROR] Thread Thread[main,5,main] died
              java.lang.NoSuchMethodError: java.nio.ByteBuffer.clear()Ljava/nio/ByteBuffer;
              at org.apache.jute.BinaryOutputArchive.stringToByteBuffer(BinaryOutputArchive.java:77)
              at org.apache.jute.BinaryOutputArchive.writeString(BinaryOutputArchive.java:107)
              at org.apache.zookeeper.txn.CreateTxn.serialize(CreateTxn.java:77)
              at org.apache.zookeeper.server.util.SerializeUtils.serializeRequest(SerializeUtils.java:162)
              at org.apache.zookeeper.server.ZKDatabase.addCommittedProposal(ZKDatabase.java:280)
              at org.apache.zookeeper.server.ZKDatabase.addCommittedProposal(ZKDatabase.java:258)
              at org.apache.zookeeper.server.ZKDatabase.access$000(ZKDatabase.java:65)
              at org.apache.zookeeper.server.ZKDatabase$1.onTxnLoaded(ZKDatabase.java:229)
              at org.apache.zookeeper.server.persistence.FileTxnSnapLog.fastForwardFromEdits(FileTxnSnapLog.java:293)
              at org.apache.zookeeper.server.persistence.FileTxnSnapLog.lambda$restore$0(FileTxnSnapLog.java:229)
              at org.apache.zookeeper.server.persistence.FileTxnSnapLog.restore(FileTxnSnapLog.java:253)
              at org.apache.zookeeper.server.ZKDatabase.loadDataBase(ZKDatabase.java:240)
              at org.apache.zookeeper.server.ZooKeeperServer.loadData(ZooKeeperServer.java:290)
              at org.apache.zookeeper.server.ZooKeeperServer.startdata(ZooKeeperServer.java:450)
              at org.apache.zookeeper.server.NIOServerCnxnFactory.startup(NIOServerCnxnFactory.java:764)
              at org.apache.zookeeper.server.ServerCnxnFactory.startup(ServerCnxnFactory.java:98)
              at org.apache.zookeeper.server.ZooKeeperServerMain.runFromConfig(ZooKeeperServerMain.java:144)
              at org.apache.zookeeper.server.ZooKeeperServerMain.initializeAndRun(ZooKeeperServerMain.java:106)
              at org.apache.zookeeper.server.ZooKeeperServerMain.main(ZooKeeperServerMain.java:64)
              at org.apache.zookeeper.server.quorum.QuorumPeerMain.initializeAndRun(QuorumPeerMain.java:128)
              at org.apache.zookeeper.server.quorum.QuorumPeerMain.main(QuorumPeerMain.java:82)
              2020-06-04 22:33:03 NIOServerCnxnFactory [ERROR] Thread Thread[NIOWorkerThread-1,5,main] died
            1. 这种问题一般是jar包的方法有问题导致的 ,更深层次需要debug 去查看,此处我们查看zookeeper的jar包,发现  和zookeeper的版本不匹配。zookeeper是3.5.7,而jar包是 3.5.6,怀疑是这个jar包导致的问题。那么我们替换掉即可。网上没有找到3.5.7的 ,用3.5.8的进行了替换

            1. zookeeper 下载地址
              https://mirrors.tuna.tsinghua.edu.cn/apache/zookeeper/zookeeper-3.5.8/apache-zookeeper-3.5.8-bin.tar.gz
              删除 zookeeper-3.5.6-SNAPSHOT.jar ,将下载下来的  zookeeper-3.5.8.jar 和 zookeeper-jute-3.5.8.jar 放进去



            4.2 列出当前topics


              kafka-topics --bootstrap-server=localhost:9092 --list



              4.3 查看topic消息内容


                kafka-console-consumer --bootstrap-server=hdp002:9092  --topic alp_order --from-beginning



                4.4 kafka-python的API简单介绍


                这是python 操作kafka的一个模块,具体介绍参考 https://github.com/dpkp/kafka-python


                • 生产者


                  from kafka import KafkaProducer
                  producer = KafkaProducer(bootstrap_servers=["localhost:9092"])
                  producer.send('test_topic', key=bytes(str('k'), value='value'.encode('utf-8')


                  • 消费者


                    from kafka import KafkaConsumer
                    consumer = KafkaConsumer("test_topic", bootstrap_servers=["localhost:9092"], auto_offset_reset='latest')
                    for msg in consumer:
                    key = msg.key.decode(encoding="utf-8") #因为接收到的数据时bytes类型,因此需要解码
                    value = msg.value.decode(encoding="utf-8")
                    print("%s-%d-%d key=%s value=%s" % (msg.topic, msg.partition, msg.offset, key, value))


                    文章转载自三杯酒coO,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

                    评论