问题
在社区看到以下问题:
请教个问题哈,sink 到 kafka,采用默认的分区器,是不是每个并行度都会与kafka的partition维护一个连接
比如 10 个并行度,3个 partition,那么维护的连接数总共为 10*3 个
? 是的
还是一个taskManager建立一个生产者 一个生产者对应多个分区
一个taskManager里面多个slot共享一个生产者? no
刚好想起,之前有个分析程序,用 FlinkKafkaProducer 写数据到 kafka,sink 只有一个并行度,sink 的 topic 有多个分区,数据永远只往 分区 0 发送数据
测试程序
来一个简单的测试程序:
- 读取 kafka 数据
- 来个 map 算子处理一下,在数据上加入当前的 subtask 的 index,标明数据是在哪个并行度处理的
- sink 数据到 kafka
- 写个简单的 java 消费者程序,讲数据的分区和数据内容输出
通过调整 flink 任务的并行度和 sink 的 topic 的分区数,测试以上问题
flink 程序
读取kafka,map算子给数据加上当前 subtask 数,标明数据是那个并行度的,最后 sink 到 kafka
object KafkaSinkTest {
val LOG = LoggerFactory.getLogger("KafkaSinkTest")
def main(args: Array[String]): Unit = {
val topic = "user_log"
val sinkTopic = "user_log_sink"
// env
val env = StreamExecutionEnvironment.getExecutionEnvironment
// global parllelism
val parallelism = 1
env.setParallelism(parallelism)
// kafka source
val kafkaSource = KafkaSource.builder[String]()
.setBootstrapServers(Common.BROKER_LIST)
.setTopics(topic)
.setGroupId("KafkaSinkTest")
.setStartingOffsets(OffsetsInitializer.latest())
.setValueOnlyDeserializer(new SimpleStringSchema())
.build();
// kafka sink
val kafkaSink = KafkaSink
.builder[String]()
.setBootstrapServers(bootstrapServer)
.setKafkaProducerConfig(Common.getProp)
.setRecordSerializer(KafkaRecordSerializationSchema.builder[String]()
.setTopic(sinkTopic)
// 不指定 key 的序列号器,key 会为 空
// .setKeySerializationSchema(new SimpleStringSchema())
.setValueSerializationSchema(new SimpleStringSchema())
.build()
)
.build()
// add source,读取数据
val sourceStream = env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), "kafkaSource")
// map, add current subtask index
val mapStream = sourceStream
// rebalance data to all parallelisn
.rebalance
.flatMap(new RichFlatMapFunction[String, String] {
override def flatMap(element: String, out: Collector[String]): Unit = {
val parallelism = getRuntimeContext.getIndexOfThisSubtask
out.collect(parallelism + "," + element)
}
})
.name("flatMap")
.uid("flatMap")
// sink to kafka, new api
mapStream.sinkTo(kafkaSink)
// sink to kafka, old api
// val kafkaProducer = new FlinkKafkaProducer[String](bootstrapServer,sinkTopic, new SimpleStringSchema())
// mapStream.addSink(kafkaProducer)
// .setParallelism(parallelism)
env.execute("KafkaSinkTest")
}
「喜欢这篇文章,您的关注和赞赏是给作者最好的鼓励」
关注作者
【版权声明】本文为墨天轮用户原创内容,转载时必须标注文章的来源(墨天轮),文章链接,文章作者等基本信息,否则作者和墨天轮有权追究责任。如果您发现墨天轮中有涉嫌抄袭或者侵权的内容,欢迎发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。




