什么是Stream
流是无限制的和连续的实时数据包。数据包通常以键值对的形式产生。生产者自动传输这些数据包,意味着不需要发出请求。
什么是Kafka Stream
Kafka Streams是Apache Kafka社区的项目之一。它是一个用于构建数据管道和微服务的客户端库。输入和输出的数据存储在Kafka集群中。它还在Kafka消费者客户端的基础上提供实时的流处理。流使得从Kafka主题转换或过滤数据并将其发布到另一个Kafka主题变得容易。
Kafka Stream API
Kafka Stream APIs提供了数据并行性、容错性和可扩展性。它将消息作为无界的、连续的和实时的记录流来处理,具有以下特点:
- 单个流可以消费和生产。
- 不支持批处理。
- 支持无状态和有状态操作。
- 除了Kafka自身没有额外依赖。
- 采用一次处理一条记录的方式,以实现毫秒级的处理延迟。
依赖包
为了实现例子,我们在build.sbt中加入kafka stream api:
libraryDependencies += "org.apache.kafka" %% "kafka-streams-scala" % "2.8.0"
代码
要编写Kafka stream应用,先导入下面包:
import org.apache.kafka.streams.KafkaStreams import org.apache.kafka.streams.StreamsConfig._ import org.apache.kafka.streams.kstream.{KStream, KStreamBuilder}
配置一些必要的Stream属性:
val streamsConfiguration = new Properties() streamsConfiguration.put(APPLICATION_ID_CONFIG, "Kafka-Streams-Example") streamsConfiguration.put(BOOTSTRAP_SERVERS_CONFIG, "localhost:9092") streamsConfiguration.put(DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String.getClass.getName) streamsConfiguration.put(DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String.getClass.getName)
首先创建KStreamBuilder实例,builder对象拥有stream方法接收输入主题名字并返回一个kstream对象实例,然后在kstream对象上可以应用各种方法,如map、join,从而返回另一个kstream对象,可以写到输出kafka的主题。
val builder = new KStreamBuilder
val kStream = builder.stream("Input_Topic")
val upperCaseKStream = kStream.mapValues(_.toLowerCase)
upperCaseKStream.to("Output_Topic")
val stream = new KafkaStreams(builder, streamsConfiguration)
stream.start()
原文标题:Kafka Streams
原文作者:Amarjeet Singh
原文地址:https://blog.knoldus.com/kafka-streams/
「喜欢这篇文章,您的关注和赞赏是给作者最好的鼓励」
关注作者
【版权声明】本文为墨天轮用户原创内容,转载时必须标注文章的来源(墨天轮),文章链接,文章作者等基本信息,否则作者和墨天轮有权追究责任。如果您发现墨天轮中有涉嫌抄袭或者侵权的内容,欢迎发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。




