暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

这六种Flink读取写入doris的方式你知道几种?

大数据技能圈 2023-09-14
115

细说Flink链接doris

Flink Doris Connector 可以支持通过 Flink 操作(读取、插入、修改、删除) Doris 中存储的数据。下面我们介绍如何通过Flink如果通过Datastream和SQL操作Doris。

修改和删除只支持在 Unique Key 模型上 

目前的删除是支持 Flink CDC 的方式接入数据实现自动删除,如果是其他数据接入的方式删除需要自己实现。

1

版本兼容

2

使用

01

Maven

添加 flink-doris-connector

    <!-- flink-doris-connector -->
    <dependency>
    <groupId>org.apache.doris</groupId>
    <artifactId>flink-doris-connector-1.16</artifactId>
    <version>1.4.0</version>
    </dependency>

    请根据不同的 Flink 版本替换对应的 Connector 和 Flink 依赖版本。

    02

    编译     

    编译时,可直接运行sh build.sh。

    编译成功后,会在 dist 目录生成目标jar包,如:flink-doris-connector-1.5.0-SNAPSHOT.jar。将此文件复制到 Flink 的 classpath 中即可使用 Flink-Doris-Connector 。例如, Local 模式运行的 Flink ,将此文件放入 lib/ 文件夹下。Yarn 集群模式运行的 Flink ,则将此文件放入预部署包。

    3

    使用方法

    01

    读取

    SQL

      -- doris source
      CREATE TABLE flink_doris_source (
      name STRING,
      age INT,
      price DECIMAL(5,2),
      sale DOUBLE
      )
      WITH (
      'connector' = 'doris',
      'fenodes' = 'FE_IP:HTTP_PORT',
      'table.identifier' = 'database.table',
      'username' = 'root',
      'password' = 'password'
      );

      DataStream

        DorisOptions.Builder builder = DorisOptions.builder()
        .setFenodes("FE_IP:HTTP_PORT")
        .setTableIdentifier("db.table")
        .setUsername("root")
        .setPassword("password");


        DorisSource<List<?>> dorisSource = DorisSourceBuilder.<List<?>>builder()
        .setDorisOptions(builder.build())
        .setDorisReadOptions(DorisReadOptions.builder().build())
        .setDeserializer(new SimpleListDeserializationSchema())
        .build();


        env.fromSource(dorisSource, WatermarkStrategy.noWatermarks(), "doris source").print();

        02

        写入

        SQL

          -- enable checkpoint
          SET 'execution.checkpointing.interval' = '10s';


          -- doris sink
          CREATE TABLE flink_doris_sink (
          name STRING,
          age INT,
          price DECIMAL(5,2),
          sale DOUBLE
          )
          WITH (
          'connector' = 'doris',
          'fenodes' = 'FE_IP:HTTP_PORT',
          'table.identifier' = 'db.table',
          'username' = 'root',
          'password' = 'password',
          'sink.label-prefix' = 'doris_label'
          );


          -- submit insert job
          INSERT INTO flink_doris_sink select name,age,price,sale from flink_doris_source

          DataStream

          DorisSink是通过StreamLoad想Doris写入数据,DataStream写入时,支持不同的序列化方法
          String 数据流(SimpleStringSerializer)
            // enable checkpoint
            env.enableCheckpointing(10000);
            // using batch mode for bounded data
            env.setRuntimeMode(RuntimeExecutionMode.BATCH);


            DorisSink.Builder<String> builder = DorisSink.builder();
            DorisOptions.Builder dorisBuilder = DorisOptions.builder();
            dorisBuilder.setFenodes("FE_IP:HTTP_PORT")
            .setTableIdentifier("db.table")
            .setUsername("root")
            .setPassword("password");


            DorisExecutionOptions.Builder executionBuilder = DorisExecutionOptions.builder();
            executionBuilder.setLabelPrefix("label-doris") //streamload label prefix
            .setDeletable(false);


            builder.setDorisReadOptions(DorisReadOptions.builder().build())
            .setDorisExecutionOptions(executionBuilder.build())
            .setSerializer(new SimpleStringSerializer()) //serialize according to string
            .setDorisOptions(dorisBuilder.build());


            //mock string source
            List<Tuple2<String, Integer>> data = new ArrayList<>();
            data.add(new Tuple2<>("doris",1));
            DataStreamSource<Tuple2<String, Integer>> source = env.fromCollection(data);


            source.map((MapFunction<Tuple2<String, Integer>, String>) t -> t.f0 + "\t" + t.f1)
            .sinkTo(builder.build());

            RowData 数据流(RowDataSerializer)

              // enable checkpoint
              env.enableCheckpointing(10000);
              // using batch mode for bounded data
              env.setRuntimeMode(RuntimeExecutionMode.BATCH);


              //doris sink option
              DorisSink.Builder<RowData> builder = DorisSink.builder();
              DorisOptions.Builder dorisBuilder = DorisOptions.builder();
              dorisBuilder.setFenodes("FE_IP:HTTP_PORT")
              .setTableIdentifier("db.table")
              .setUsername("root")
              .setPassword("password");


              // json format to streamload
              Properties properties = new Properties();
              properties.setProperty("format", "json");
              properties.setProperty("read_json_by_line", "true");
              DorisExecutionOptions.Builder executionBuilder = DorisExecutionOptions.builder();
              executionBuilder.setLabelPrefix("label-doris") //streamload label prefix
              .setDeletable(false)
              .setStreamLoadProp(properties); //streamload params


              //flink rowdata‘s schema
              String[] fields = {"city", "longitude", "latitude", "destroy_date"};
              DataType[] types = {DataTypes.VARCHAR(256), DataTypes.DOUBLE(), DataTypes.DOUBLE(), DataTypes.DATE()};


              builder.setDorisReadOptions(DorisReadOptions.builder().build())
              .setDorisExecutionOptions(executionBuilder.build())
              .setSerializer(RowDataSerializer.builder() //serialize according to rowdata
              .setFieldNames(fields)
              .setType("json") //json format
              .setFieldType(types).build())
              .setDorisOptions(dorisBuilder.build());


              //mock rowdata source
              DataStream<RowData> source = env.fromElements("")
              .map(new MapFunction<String, RowData>() {
              @Override
              public RowData map(String value) throws Exception {
              GenericRowData genericRowData = new GenericRowData(4);
              genericRowData.setField(0, StringData.fromString("beijing"));
              genericRowData.setField(1, 116.405419);
              genericRowData.setField(2, 39.916927);
              genericRowData.setField(3, LocalDate.now().toEpochDay());
              return genericRowData;
              }
              });


              source.sinkTo(builder.build());

              SchemaChange 数据流(JsonDebeziumSchemaSerializer)

                // enable checkpoint
                env.enableCheckpointing(10000);


                Properties props = new Properties();
                props.setProperty("format", "json");
                props.setProperty("read_json_by_line", "true");
                DorisOptions dorisOptions = DorisOptions.builder()
                .setFenodes("127.0.0.1:8030")
                .setTableIdentifier("test.t1")
                .setUsername("root")
                .setPassword("").build();


                DorisExecutionOptions.Builder executionBuilder = DorisExecutionOptions.builder();
                executionBuilder.setLabelPrefix("label-prefix")
                .setStreamLoadProp(props).setDeletable(true);


                DorisSink.Builder<String> builder = DorisSink.builder();
                builder.setDorisReadOptions(DorisReadOptions.builder().build())
                .setDorisExecutionOptions(executionBuilder.build())
                .setDorisOptions(dorisOptions)
                .setSerializer(JsonDebeziumSchemaSerializer.builder().setDorisOptions(dorisOptions).build());


                env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "MySQL Source")
                .sinkTo(builder.build());

                动动手指

                关注我们

                文章转载自大数据技能圈,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

                评论