
在本文中,我们讨论使用zio-kafka库从kafka中读取消息和处理消息。
Apache Kafka是一个分布式发布-订阅消息系统,并且是个健壮的队列,可以处理大量的数据,并使你能够将消息从一个终端传递到另一个终端。
ZIO是一个零依赖性的库,用于Scala中的异步和并发编程。它是Scala中的一个函数式副作用系统。
ZIO Kafka库为Kafka客户端提供了个纯函数、基于流的接口。它可以轻易的与ZIO,ZIO Streams集成。
依赖库
在项目的build.sbt文件中添加zio kafka的依赖。
libraryDependencies ++= Seq(
"dev.zio" %% "zio-kafka" % "0.15.0"
)
docker-compose.yml配置
启动Kafka服务器,需要先启动Zookeeper服务器。
我们可以在docker-compose.yml文件中配置这个依赖关系,这将确保Zookeeper服务器总是在Kafka服务器之前启动,并在它之后停止。
version: '2'
services:
zookeeper:
image: confluentinc/cp-zookeeper:latest
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
ports:
- 22181:2181
kafka:
image: confluentinc/cp-kafka:latest
depends_on:
- zookeeper
ports:
- 29092:29092
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092,PLAINTEXT_HOST://localhost:29092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
这个设置中,Zookeeper服务器在port=2181上监听。
kafka服务器通过端口29092暴露给主机应用程序,但它实际上是在由KAFKA_ADVERTISED_LISTENERS属性配置在容器环境中的端口9092上运行。
通过使用docker-compose命令来启动Kafka服务器:
$ docker-compose up -d
消费者程序
首先,我们将创建的Kafka消费者看成一个ZLayer,并把它提供给我们主程序中的消费者流。
让我们了解一下ZioKafkaConsumer应用,首先我们在managedConsumer中创建所需的配置,然后我们完成Kafka消费者。由于每个消费者都拥有一个内部连接池来连接到Broker,我们不希望在失败的情况下泄露这样一个连接池。
val managedConsumer = Consumer.make(ConsumerSettings(List("localhost:9092")) .withGroupId("zio-group")) val consumer = ZLayer.fromManaged(managedConsumer)
我们使用ZManaged[R, E, A],它是一个数据结构,用R来封装A型资源的获取和释放,并可能因E型的错误而失败。
之后,我们从managedConsumer创建ZLayer作为消费者。
把消息当作流一样消费
现在我们使用ZStream[R, E, A]来消费来自kafka主题的消息,它表示需要环境R来执行的有效流,最终以E类型的错误失败,并产生A类型的值。
为了创建流,我们需要将消费者订阅到Kafka主题,并配置key和value的字节。
在订阅了kafka主题后,我们使用plainStream方法来处理消息。
plainStream方法需要两个Serde作为参数,第一个是key,第二个是消息的value。
val streams = Consumer.subscribeAnd(Subscription.topics("kafkaTopic")) .plainStream(Serde.string,Serde.string) .map(cr => (cr.value,cr.offset)) .tap { case (value, _ ) => zio.console.putStrLn(s"| $value |") } .map(_._2) //stream of offsets .aggregateAsync(Consumer.offsetBatches)
Apache Kafka引入了Serde的概念,它代表了_ser_ializer和_de_serializer。在将读取的消息materialization为流的过程中,我们为消息的key和value都指定了适合的Serde类型。
然后我们将这些CommitableRecord[K,V]对映射成一个记录值和偏移量的元组。
接下来,我们使用tap,它用来为流中每个元素的消费添加副作用。
并使用zio.console以函数式方式将数值打印到控制台。
之后,我们使用Consumer.offsetBatches转换器,通过使用aggregateAsync方法来合并输入的偏移。
只要流的下游操作符繁忙,它就使用提供的接收器聚合流的元素。
接下来我们使用ZSink的foreach函数,该函数用于将一个副作用函数应用到sink emit的所有值。
val streamEffect = streams.run(ZSink.foreach((offset => offset.commit)))
运行消费者程序
现在我们的消费者已经准备好从Kafka主题读取消息,所以定义zio.App的运行方法,通过提供ZLayer和副作用的动作来执行我们的应用程序。
override def run(args: List[String]) = streamEffect.provideSomeLayer(consumer ++ zio.console.Console.live).exitCode
生产者程序
现在我们创建生产者应用程序,用于生产消息到kafka主题。
就像我们在消费者应用中定义managedConsumer一样,我们也要定义mangedProducer,并进行必要的配置。
但在定义ProducerSettings时有一点不同,因为它需要serde来序列化和反序列化要生产消息的键和值。
val managedProducer = Producer.make(ProducerSettings(List("localhost:9092")),Serde.string,Serde.string) val producer: ZLayer[Blocking, Throwable, Producer[Any, String,String]] = ZLayer.fromManaged(managedProducer)
之后,我们使用提供Zlayer的manageProducer来定义生产者,与消费者类似。
接下来我们使用ProducerRecord来提供要生产的消息。
它创建了一条记录发送到kafka,它需要topic、key、value来生成。
val record = new ProducerRecord("kafkaTopic","key-1","abc")
运行生产者程序
最后,我们使用Producer.produce方法发送消息。
然后,我们通过覆写zio.App的run方法来运行生产者应用程序,类似于消费者应用程序。
val producerEffect: ZIO[Producer[Any, String,String],Throwable, RecordMetadata] = Producer.produce(record) override def run(args: List[String]): URIO[zio.ZEnv, ExitCode] = producerEffect.provideSomeLayer(producer).exitCode
注:-首先运行消费者应用程序,然后运行生产者应用程序,以便在消费者应用程序中消费消息。
完整的代码在这个仓库中。
结论
在这篇博客中,我们使用zio-kafka建立了基本的应用程序,在docker的帮助下生产和消费消息。我们制作了一个docker-compose.yml文件来启动zookeeper和Kafka服务。然后,我们专注于消费者,并学习了如何订阅主题。
参考
Ziverge:https://ziverge.com/
原文标题:Introduction to Zio-Kafka
原文作者:Akash Kumar
原文地址:https://blog.knoldus.com/introduction-to-zio-kafka/




