暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

Flink sink 到 kafka,并行度与分区的关系

原创 数据库新手-ray 2022-06-14
1118

问题

在社区看到以下问题:


请教个问题哈,sink 到 kafka,采用默认的分区器,是不是每个并行度都会与kafka的partition维护一个连接

比如 10 个并行度,3个 partition,那么维护的连接数总共为 10*3 个

?  是的

还是一个taskManager建立一个生产者 一个生产者对应多个分区

一个taskManager里面多个slot共享一个生产者? no

刚好想起,之前有个分析程序,用 FlinkKafkaProducer 写数据到 kafka,sink 只有一个并行度,sink 的 topic 有多个分区,数据永远只往 分区 0 发送数据

测试程序

来一个简单的测试程序:

  1. 读取 kafka 数据
  2. 来个 map 算子处理一下,在数据上加入当前的 subtask 的 index,标明数据是在哪个并行度处理的
  3. sink 数据到 kafka
  4. 写个简单的 java 消费者程序,讲数据的分区和数据内容输出

通过调整 flink 任务的并行度和 sink 的 topic 的分区数,测试以上问题

flink 程序

读取kafka,map算子给数据加上当前 subtask 数,标明数据是那个并行度的,最后 sink 到 kafka

object KafkaSinkTest {

  val LOG = LoggerFactory.getLogger("KafkaSinkTest")

  def main(args: Array[String]): Unit = {

    val topic = "user_log"
    val sinkTopic = "user_log_sink"

    // env
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    // global parllelism
    val parallelism = 1
    env.setParallelism(parallelism)

    // kafka source
    val kafkaSource = KafkaSource.builder[String]()
      .setBootstrapServers(Common.BROKER_LIST)
      .setTopics(topic)
      .setGroupId("KafkaSinkTest")
      .setStartingOffsets(OffsetsInitializer.latest())
      .setValueOnlyDeserializer(new SimpleStringSchema())
      .build();

    // kafka sink
    val kafkaSink = KafkaSink
      .builder[String]()
      .setBootstrapServers(bootstrapServer)
      .setKafkaProducerConfig(Common.getProp)
      .setRecordSerializer(KafkaRecordSerializationSchema.builder[String]()
        .setTopic(sinkTopic)
        // 不指定 key 的序列号器,key 会为 空
//        .setKeySerializationSchema(new SimpleStringSchema())
        .setValueSerializationSchema(new SimpleStringSchema())
        .build()
      )
      .build()

    // add source,读取数据
    val sourceStream = env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), "kafkaSource")

    // map, add current subtask index
    val mapStream = sourceStream
     // rebalance data to all parallelisn
      .rebalance
      .flatMap(new RichFlatMapFunction[String, String] {
        override def flatMap(element: String, out: Collector[String]): Unit = {
          val parallelism = getRuntimeContext.getIndexOfThisSubtask
          out.collect(parallelism + "," + element)

        }
      })
      .name("flatMap")
      .uid("flatMap")

    // sink to kafka, new api
    mapStream.sinkTo(kafkaSink)

    // sink to kafka, old api
    //    val kafkaProducer = new FlinkKafkaProducer[String](bootstrapServer,sinkTopic, new SimpleStringSchema())
    //    mapStream.addSink(kafkaProducer)
    //      .setParallelism(parallelism)

    env.execute("KafkaSinkTest")
  }
「喜欢这篇文章,您的关注和赞赏是给作者最好的鼓励」
关注作者
【版权声明】本文为墨天轮用户原创内容,转载时必须标注文章的来源(墨天轮),文章链接,文章作者等基本信息,否则作者和墨天轮有权追究责任。如果您发现墨天轮中有涉嫌抄袭或者侵权的内容,欢迎发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论