暂无图片
暂无图片
1
暂无图片
暂无图片
暂无图片

Zio-Kafka简介

原创 简单 2022-09-16
1036

image.png

在本文中,我们讨论使用zio-kafka库从kafka中读取消息和处理消息。

Apache Kafka是一个分布式发布-订阅消息系统,并且是个健壮的队列,可以处理大量的数据,并使你能够将消息从一个终端传递到另一个终端。

ZIO是一个零依赖性的库,用于Scala中的异步和并发编程。它是Scala中的一个函数式副作用系统。

ZIO Kafka库为Kafka客户端提供了个纯函数、基于流的接口。它可以轻易的与ZIO,ZIO Streams集成。

这个博客需要有ziokafka的基本知识。

依赖库

在项目的build.sbt文件中添加zio kafka的依赖。

libraryDependencies ++= Seq(
  "dev.zio" %% "zio-kafka" % "0.15.0"
)

docker-compose.yml配置

启动Kafka服务器,需要先启动Zookeeper服务器。

我们可以在docker-compose.yml文件中配置这个依赖关系,这将确保Zookeeper服务器总是在Kafka服务器之前启动,并在它之后停止。

version: '2'
services:
  zookeeper:
    image: confluentinc/cp-zookeeper:latest
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000
    ports:
      - 22181:2181
  
  kafka:
    image: confluentinc/cp-kafka:latest
    depends_on:
      - zookeeper
    ports:
      - 29092:29092
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092,PLAINTEXT_HOST://localhost:29092
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1

这个设置中,Zookeeper服务器在port=2181上监听。

kafka服务器通过端口29092暴露给主机应用程序,但它实际上是在由KAFKA_ADVERTISED_LISTENERS属性配置在容器环境中的端口9092上运行。

通过使用docker-compose命令来启动Kafka服务器:

$ docker-compose up -d

消费者程序

首先,我们将创建的Kafka消费者看成一个ZLayer,并把它提供给我们主程序中的消费者流。

让我们了解一下ZioKafkaConsumer应用,首先我们在managedConsumer中创建所需的配置,然后我们完成Kafka消费者。由于每个消费者都拥有一个内部连接池来连接到Broker,我们不希望在失败的情况下泄露这样一个连接池。

val managedConsumer = Consumer.make(ConsumerSettings(List("localhost:9092")) .withGroupId("zio-group")) val consumer = ZLayer.fromManaged(managedConsumer)

我们使用ZManaged[R, E, A],它是一个数据结构,用R来封装A型资源的获取和释放,并可能因E型的错误而失败。

之后,我们从managedConsumer创建ZLayer作为消费者。

把消息当作流一样消费

现在我们使用ZStream[R, E, A]来消费来自kafka主题的消息,它表示需要环境R来执行的有效流,最终以E类型的错误失败,并产生A类型的值。

为了创建流,我们需要将消费者订阅到Kafka主题,并配置key和value的字节。

在订阅了kafka主题后,我们使用plainStream方法来处理消息。

plainStream方法需要两个Serde作为参数,第一个是key,第二个是消息的value。

val streams = Consumer.subscribeAnd(Subscription.topics("kafkaTopic")) .plainStream(Serde.string,Serde.string) .map(cr => (cr.value,cr.offset)) .tap { case (value, _ ) => zio.console.putStrLn(s"| $value |") } .map(_._2) //stream of offsets .aggregateAsync(Consumer.offsetBatches)

Apache Kafka引入了Serde的概念,它代表了_ser_ializer和_de_serializer。在将读取的消息materialization为流的过程中,我们为消息的key和value都指定了适合的Serde类型。

然后我们将这些CommitableRecord[K,V]对映射成一个记录值和偏移量的元组。

接下来,我们使用tap,它用来为流中每个元素的消费添加副作用。

并使用zio.console以函数式方式将数值打印到控制台。

之后,我们使用Consumer.offsetBatches转换器,通过使用aggregateAsync方法来合并输入的偏移。

只要流的下游操作符繁忙,它就使用提供的接收器聚合流的元素。

接下来我们使用ZSink的foreach函数,该函数用于将一个副作用函数应用到sink emit的所有值。

val streamEffect = streams.run(ZSink.foreach((offset => offset.commit)))

运行消费者程序

现在我们的消费者已经准备好从Kafka主题读取消息,所以定义zio.App的运行方法,通过提供ZLayer和副作用的动作来执行我们的应用程序。

override def run(args: List[String]) = streamEffect.provideSomeLayer(consumer ++ zio.console.Console.live).exitCode

生产者程序

现在我们创建生产者应用程序,用于生产消息到kafka主题。

就像我们在消费者应用中定义managedConsumer一样,我们也要定义mangedProducer,并进行必要的配置。

但在定义ProducerSettings时有一点不同,因为它需要serde来序列化和反序列化要生产消息的键和值。

val managedProducer = Producer.make(ProducerSettings(List("localhost:9092")),Serde.string,Serde.string) val producer: ZLayer[Blocking, Throwable, Producer[Any, String,String]] = ZLayer.fromManaged(managedProducer)

之后,我们使用提供ZlayermanageProducer来定义生产者,与消费者类似。

接下来我们使用ProducerRecord来提供要生产的消息。

它创建了一条记录发送到kafka,它需要topic、key、value来生成。

val record = new ProducerRecord("kafkaTopic","key-1","abc")

运行生产者程序

最后,我们使用Producer.produce方法发送消息。

然后,我们通过覆写zio.App的run方法来运行生产者应用程序,类似于消费者应用程序。

val producerEffect: ZIO[Producer[Any, String,String],Throwable, RecordMetadata] = Producer.produce(record) override def run(args: List[String]): URIO[zio.ZEnv, ExitCode] = producerEffect.provideSomeLayer(producer).exitCode

注:-首先运行消费者应用程序,然后运行生产者应用程序,以便在消费者应用程序中消费消息。

完整的代码在这个仓库中。

结论

在这篇博客中,我们使用zio-kafka建立了基本的应用程序,在docker的帮助下生产和消费消息。我们制作了一个docker-compose.yml文件来启动zookeeper和Kafka服务。然后,我们专注于消费者,并学习了如何订阅主题。

参考

Ziverge:https://ziverge.com/

原文标题:Introduction to Zio-Kafka
原文作者:Akash Kumar
原文地址:https://blog.knoldus.com/introduction-to-zio-kafka/

「喜欢这篇文章,您的关注和赞赏是给作者最好的鼓励」
关注作者
【版权声明】本文为墨天轮用户原创内容,转载时必须标注文章的来源(墨天轮),文章链接,文章作者等基本信息,否则作者和墨天轮有权追究责任。如果您发现墨天轮中有涉嫌抄袭或者侵权的内容,欢迎发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论