暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

建议收藏 | 这10个Flink操作doris代码demo助你效率翻倍!

大数据技能圈 2023-09-17
55

Flink 连接 Doris

Flink可以通过各种方式链接doris,下面是我整理的10个demo代码,亲测有效,建议收藏~

1

CDCSchemaChangeExample

    package org.apache.doris.flink;


    import com.ververica.cdc.connectors.mysql.source.MySqlSource;
    import com.ververica.cdc.connectors.mysql.table.StartupOptions;
    import com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.json.JsonConverterConfig;
    import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
    import org.apache.doris.flink.cfg.DorisExecutionOptions;
    import org.apache.doris.flink.cfg.DorisOptions;
    import org.apache.doris.flink.cfg.DorisReadOptions;
    import org.apache.doris.flink.sink.DorisSink;
    import org.apache.doris.flink.sink.writer.JsonDebeziumSchemaSerializer;
    import org.apache.doris.flink.utils.DateToStringConverter;
    import org.apache.flink.api.common.eventtime.WatermarkStrategy;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;


    import java.util.HashMap;
    import java.util.Map;
    import java.util.Properties;
    import java.util.UUID;


    public class CDCSchemaChangeExample {


    public static void main(String[] args) throws Exception {


    Map<String, Object> customConverterConfigs = new HashMap<>();
    customConverterConfigs.put(JsonConverterConfig.DECIMAL_FORMAT_CONFIG, "numeric");
    JsonDebeziumDeserializationSchema schema =
    new JsonDebeziumDeserializationSchema(false, customConverterConfigs);


    MySqlSource<String> mySqlSource = MySqlSource.<String>builder()
    .hostname("127.0.0.1")
    .port(3306)
    .databaseList("test") // set captured database
    .tableList("test.t1") // set captured table
    .username("root")
    .password("123456")
    .debeziumProperties(DateToStringConverter.DEFAULT_PROPS)
    .deserializer(schema)
    .serverTimeZone("Asia/Shanghai")
    .includeSchemaChanges(true) // converts SourceRecord to JSON String
    .build();
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    env.setParallelism(1);
    // enable checkpoint
    env.enableCheckpointing(10000);
    //
    Properties props = new Properties();
    props.setProperty("format", "json");
    props.setProperty("read_json_by_line", "true");
    DorisOptions dorisOptions = DorisOptions.builder()
    .setFenodes("127.0.0.1:8030")
    .setTableIdentifier("test.t1")
    .setUsername("root")
    .setPassword("").build();
    //
    DorisExecutionOptions.Builder executionBuilder = DorisExecutionOptions.builder();
    executionBuilder.setLabelPrefix("label-doris" + UUID.randomUUID())
    .setStreamLoadProp(props).setDeletable(true);


    DorisSink.Builder<String> builder = DorisSink.builder();
    builder.setDorisReadOptions(DorisReadOptions.builder().build())
    .setDorisExecutionOptions(executionBuilder.build())
    .setDorisOptions(dorisOptions)
    .setSerializer(JsonDebeziumSchemaSerializer.builder().setDorisOptions(dorisOptions).setNewSchemaChange(true).build());


    env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "MySQL Source")//.print();
    .sinkTo(builder.build());


    env.execute("Print MySQL Snapshot + Binlog");
    }
    }

    2

    DorisDateAndTimestampSqlTest

      package org.apache.doris.flink;


      import org.apache.flink.table.api.EnvironmentSettings;
      import org.apache.flink.table.api.TableEnvironment;


      import java.util.UUID;


      public class DorisDateAndTimestampSqlTest {


      public static void main(String[] args) {
      TableEnvironment tEnv = TableEnvironment.create(EnvironmentSettings.newInstance().inBatchMode().build());
      tEnv.executeSql("create table test_source ( " +
      " id INT, " +
      " score DECIMAL(10, 9), " +
      " submit_time TIMESTAMP " +
      " ) with ( " +
      " 'password'='', " +
      " 'connector'='doris', " +
      " 'fenodes'='FE_HOST:FE_PORT', " +
      " 'table.identifier'='db.source_table', " +
      " 'username'='root' " +
      ")");


      tEnv.executeSql("create table test_sink ( " +
      " id INT, " +
      " score DECIMAL(10, 9), " +
      " submit_time DATE " +
      " ) with ( " +
      " 'password'='', " +
      " 'connector'='doris', " +
      " 'fenodes'='FE_HOST:FE_PORT', " +
      " 'sink.label-prefix' = 'label_" + UUID.randomUUID()+"' , " +
      " 'table.identifier'='db.sink_table', " +
      " 'username'='root' " +
      ")");
      tEnv.executeSql(
      "insert into " +
      " test_sink " +
      "select " +
      " id, " +
      " score," +
      " to_date(DATE_FORMAT(submit_time, 'yyyy-MM-dd')) as submit_time " +
      "from " +
      " test_source " +
      "where " +
      " submit_time>='2022-05-31 00:00:00'")
      .print();
      }


      }

      3

      DorisIntranetAccessSinkExample

        public class DorisIntranetAccessSinkExample {


        public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        env.setRuntimeMode(RuntimeExecutionMode.BATCH);
        env.enableCheckpointing(10000);
        env.getCheckpointConfig()
        .enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
        env.setRestartStrategy(RestartStrategies.fixedDelayRestart(5, Time.milliseconds(30000)));


        DorisSink.Builder<String> builder = DorisSink.builder();
        final DorisReadOptions.Builder readOptionBuilder = DorisReadOptions.builder();
        readOptionBuilder.setDeserializeArrowAsync(false)
        .setDeserializeQueueSize(64)
        .setExecMemLimit(2147483648L)
        .setRequestQueryTimeoutS(3600)
        .setRequestBatchSize(1000)
        .setRequestConnectTimeoutMs(10000)
        .setRequestReadTimeoutMs(10000)
        .setRequestRetries(3)
        .setRequestTabletSize(1024 * 1024);


        Properties properties = new Properties();
        properties.setProperty("column_separator", ",");
        properties.setProperty("line_delimiter", "\n");
        properties.setProperty("format", "csv");
        DorisOptions.Builder dorisBuilder = DorisOptions.builder();
        dorisBuilder.setFenodes("10.20.30.1:8030")
        .setBenodes("10.20.30.1:8040, 10.20.30.2:8040, 10.20.30.3:8040")
        .setTableIdentifier("test.test_sink")
        .setUsername("root")
        .setPassword("");


        DorisExecutionOptions.Builder executionBuilder = DorisExecutionOptions.builder();
        executionBuilder
        .disable2PC().setLabelPrefix("label-doris")
        .setStreamLoadProp(properties)
        .setBufferSize(8 * 1024)
        .setBufferCount(3);


        builder.setDorisReadOptions(readOptionBuilder.build())
        .setDorisExecutionOptions(executionBuilder.build())
        .setSerializer(new SimpleStringSerializer())
        .setDorisOptions(dorisBuilder.build());


        List<Tuple2<Integer, String>> data = new ArrayList<>();
        data.add(new Tuple2<>(1, "zhangsan"));
        data.add(new Tuple2<>(2, "lisi"));
        data.add(new Tuple2<>(3, "wangwu"));
        DataStreamSource<Tuple2<Integer, String>> source = env.fromCollection(data);
        source.map((MapFunction<Tuple2<Integer, String>, String>) t -> t.f0 + "," + t.f1)
        .sinkTo(builder.build());
        env.execute("doris test");
        }
        }

        4

        DorisSinkArraySQLExample

          package org.apache.doris.flink;


          import org.apache.flink.api.common.RuntimeExecutionMode;
          import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
          import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;


          import java.util.UUID;


          public class DorisSinkArraySQLExample {


          public static void main(String[] args) throws Exception{
          final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
          env.setParallelism(1);
          env.setRuntimeMode(RuntimeExecutionMode.BATCH);
          final StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
          tEnv.executeSql("CREATE TABLE source (\n" +
          " `id` int,\n" +
          " `c_1` array<INT> ,\n" +
          " `c_2` array<TINYINT> ,\n" +
          " `c_3` array<SMALLINT> ,\n" +
          " `c_4` array<INT> ,\n" +
          " `c_5` array<BIGINT> ,\n" +
          " `c_6` array<BIGINT> ,\n" +
          " `c_7` array<FLOAT>,\n" +
          " `c_8` array<DOUBLE> ,\n" +
          " `c_9` array<DECIMAL(4,2)> ,\n" +
          " `c_10` array<DATE> ,\n" +
          " `c_11` array<DATE> ,\n" +
          " `c_12` array<TIMESTAMP> ,\n" +
          " `c_13` array<TIMESTAMP> ,\n" +
          " `c_14` array<CHAR(10)> ,\n" +
          " `c_15` array<VARCHAR(256)> ,\n" +
          " `c_16` array<STRING> \n" +
          ") WITH (\n" +
          " 'connector' = 'datagen', \n" +
          " 'fields.c_7.element.min' = '1', \n" +
          " 'fields.c_7.element.max' = '10', \n" +
          " 'fields.c_8.element.min' = '1', \n" +
          " 'fields.c_8.element.max' = '10', \n" +
          " 'fields.c_14.element.length' = '10', \n" +
          " 'fields.c_15.element.length' = '10', \n" +
          " 'fields.c_16.element.length' = '10', \n" +
          " 'number-of-rows' = '5' \n" +
          ");");




          tEnv.executeSql("CREATE TABLE source_doris (" +
          " `id` int,\n" +
          " `c_1` array<INT> ,\n" +
          " `c_2` array<TINYINT> ,\n" +
          " `c_3` array<SMALLINT> ,\n" +
          " `c_4` array<INT> ,\n" +
          " `c_5` array<BIGINT> ,\n" +
          " `c_6` array<STRING> ,\n" +
          " `c_7` array<FLOAT> ,\n" +
          " `c_8` array<DOUBLE> ,\n" +
          " `c_9` array<DECIMAL(4,2)> ,\n" +
          " `c_10` array<STRING> ,\n" + //ARRAY<DATE>
          " `c_11` array<STRING> ,\n" + //ARRAY<DATE>
          " `c_12` array<STRING> ,\n" + //ARRAY<TIMESTAMP>
          " `c_13` array<STRING> ,\n" + //ARRAY<TIMESTAMP>
          " `c_14` array<CHAR(10)> ,\n" +
          " `c_15` array<VARCHAR(256)> ,\n" +
          " `c_16` array<STRING> \n" +
          ") WITH (" +
          " 'connector' = 'doris',\n" +
          " 'fenodes' = '127.0.0.1:8030',\n" +
          " 'table.identifier' = 'test.array_test_type',\n" +
          " 'username' = 'root',\n" +
          " 'password' = ''\n" +
          ")");






          // define a dynamic aggregating query
          // final Table result = tEnv.sqlQuery("SELECT * from source_doris ");
          //
          // print the result to the console
          // tEnv.toRetractStream(result, Row.class).print();
          // env.execute();


          tEnv.executeSql(
          "CREATE TABLE sink (" +
          " `id` int,\n" +
          " `c_1` array<INT> ,\n" +
          " `c_2` array<TINYINT> ,\n" +
          " `c_3` array<SMALLINT> ,\n" +
          " `c_4` array<INT> ,\n" +
          " `c_5` array<BIGINT> ,\n" +
          " `c_6` array<STRING> ,\n" +
          " `c_7` array<FLOAT> ,\n" +
          " `c_8` array<DOUBLE> ,\n" +
          " `c_9` array<DECIMAL(4,2)> ,\n" +
          " `c_10` array<STRING> ,\n" + //ARRAY<DATE>
          " `c_11` array<STRING> ,\n" + //ARRAY<DATE>
          " `c_12` array<STRING> ,\n" + //ARRAY<TIMESTAMP>
          " `c_13` array<STRING> ,\n" + //ARRAY<TIMESTAMP>
          " `c_14` array<CHAR(10)> ,\n" +
          " `c_15` array<VARCHAR(256)> ,\n" +
          " `c_16` array<STRING> \n" +
          ") " +
          "WITH (\n" +
          " 'connector' = 'doris',\n" +
          " 'fenodes' = '127.0.0.1:8030',\n" +
          " 'table.identifier' = 'test.array_test_type_sink',\n" +
          " 'username' = 'root',\n" +
          " 'password' = '',\n" +
          " 'sink.label-prefix' = 'doris_label4" + UUID.randomUUID() + "'" +
          ")");
          tEnv.executeSql("INSERT INTO sink select * from source_doris");


          }
          }

          5

          DorisSinkBatchExample

            package org.apache.doris.flink;


            import org.apache.doris.flink.cfg.DorisExecutionOptions;
            import org.apache.doris.flink.cfg.DorisOptions;
            import org.apache.doris.flink.cfg.DorisReadOptions;
            import org.apache.doris.flink.sink.batch.DorisBatchSink;
            import org.apache.doris.flink.sink.writer.SimpleStringSerializer;
            import org.apache.flink.streaming.api.datastream.DataStreamSource;
            import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
            import org.apache.flink.streaming.api.functions.source.SourceFunction;


            import java.util.Arrays;
            import java.util.Properties;
            import java.util.UUID;




            public class DorisSinkBatchExample {
            public static void main(String[] args) throws Exception {
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            env.setParallelism(1);
            // env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
            env.enableCheckpointing(5000);
            // env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
            // env.setRestartStrategy(RestartStrategies.fixedDelayRestart(5, Time.milliseconds(30000)));
            DorisBatchSink.Builder<String> builder = DorisBatchSink.builder();
            final DorisReadOptions.Builder readOptionBuilder = DorisReadOptions.builder();
            readOptionBuilder.setDeserializeArrowAsync(false)
            .setDeserializeQueueSize(64)
            .setExecMemLimit(2147483648L)
            .setRequestQueryTimeoutS(3600)
            .setRequestBatchSize(1000)
            .setRequestConnectTimeoutMs(10000)
            .setRequestReadTimeoutMs(10000)
            .setRequestRetries(3)
            .setRequestTabletSize(1024 * 1024);
            Properties properties = new Properties();
            properties.setProperty("column_separator", ",");
            properties.setProperty("line_delimiter", "\n");
            properties.setProperty("format", "csv");
            DorisOptions.Builder dorisBuilder = DorisOptions.builder();
            dorisBuilder.setFenodes("127.0.0.1:8030")
            .setTableIdentifier("test.test_flink")
            .setUsername("root")
            .setPassword("");
            DorisExecutionOptions.Builder executionBuilder = DorisExecutionOptions.builder();
            executionBuilder.setLabelPrefix("label")
            .setStreamLoadProp(properties)
            .setDeletable(false)
            .setBufferFlushMaxBytes(8 * 1024)
            .setBufferFlushMaxRows(900)
            .setBufferFlushIntervalMs(1000 * 10);


            builder.setDorisReadOptions(readOptionBuilder.build())
            .setDorisExecutionOptions(executionBuilder.build())
            .setSerializer(new SimpleStringSerializer())
            .setDorisOptions(dorisBuilder.build());


            env.addSource(new SourceFunction<String>() {
            private Long id = 0L;


            @Override
            public void run(SourceContext<String> out) throws Exception {
            while (true) {
            id = id + 1;
            String record = id + "," + UUID.randomUUID() + "," + id + "";
            out.collect(record);
            Thread.sleep(500);
            }
            }


            @Override
            public void cancel() {


            }
            }).sinkTo(builder.build());


            env.execute("doris batch test");
            }


            public void testBatchFlush() throws Exception {
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            env.setParallelism(1);


            DorisBatchSink.Builder<String> builder = DorisBatchSink.builder();
            final DorisReadOptions.Builder readOptionBuilder = DorisReadOptions.builder();


            readOptionBuilder.setDeserializeArrowAsync(false)
            .setDeserializeQueueSize(64)
            .setExecMemLimit(2147483648L)
            .setRequestQueryTimeoutS(3600)
            .setRequestBatchSize(1000)
            .setRequestConnectTimeoutMs(10000)
            .setRequestReadTimeoutMs(10000)
            .setRequestRetries(3)
            .setRequestTabletSize(1024 * 1024);


            Properties properties = new Properties();
            properties.setProperty("column_separator", ",");
            properties.setProperty("line_delimiter", "\n");
            properties.setProperty("format", "csv");
            DorisOptions.Builder dorisBuilder = DorisOptions.builder();
            dorisBuilder.setFenodes("127.0.0.1:8030")
            .setTableIdentifier("test.testd")
            .setUsername("root")
            .setPassword("");


            DorisExecutionOptions.Builder executionBuilder = DorisExecutionOptions.builder();


            executionBuilder.setLabelPrefix("label")
            .setStreamLoadProp(properties)
            .setDeletable(false)
            .setBufferFlushMaxBytes(8 * 1024)
            .setBufferFlushMaxRows(1)
            .setBufferFlushIntervalMs(1000 * 10);


            builder.setDorisReadOptions(readOptionBuilder.build())
            .setDorisExecutionOptions(executionBuilder.build())
            .setSerializer(new SimpleStringSerializer())
            .setDorisOptions(dorisBuilder.build());




            DataStreamSource<String> stringDataStreamSource = env.fromCollection(
            Arrays.asList("1,-74159.9193252453", "2,-74159.9193252453", "3,-19.7004480979", "4,43385.2170333507", "5,-16.2602598554"));
            stringDataStreamSource.sinkTo(builder.build());


            env.execute("doris batch test");


            }
            }

            6

            DorisSinkExample

              package org.apache.doris.flink;


              import org.apache.doris.flink.cfg.DorisExecutionOptions;
              import org.apache.doris.flink.cfg.DorisOptions;
              import org.apache.doris.flink.cfg.DorisReadOptions;
              import org.apache.doris.flink.sink.DorisSink;
              import org.apache.doris.flink.sink.writer.SimpleStringSerializer;
              import org.apache.flink.api.common.RuntimeExecutionMode;
              import org.apache.flink.api.common.functions.MapFunction;
              import org.apache.flink.api.common.restartstrategy.RestartStrategies;
              import org.apache.flink.api.common.time.Time;
              import org.apache.flink.api.java.tuple.Tuple2;
              import org.apache.flink.streaming.api.TimeCharacteristic;
              import org.apache.flink.streaming.api.datastream.DataStreamSource;
              import org.apache.flink.streaming.api.environment.CheckpointConfig;
              import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;


              import java.util.ArrayList;
              import java.util.List;
              import java.util.Properties;




              public class DorisSinkExample {


              public static void main(String[] args) throws Exception{
              StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
              env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
              env.setRuntimeMode(RuntimeExecutionMode.BATCH);
              env.enableCheckpointing(10000);
              env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
              env.setRestartStrategy(RestartStrategies.fixedDelayRestart(5, Time.milliseconds(30000)));
              DorisSink.Builder<String> builder = DorisSink.builder();
              final DorisReadOptions.Builder readOptionBuilder = DorisReadOptions.builder();
              readOptionBuilder.setDeserializeArrowAsync(false)
              .setDeserializeQueueSize(64)
              .setExecMemLimit(2147483648L)
              .setRequestQueryTimeoutS(3600)
              .setRequestBatchSize(1000)
              .setRequestConnectTimeoutMs(10000)
              .setRequestReadTimeoutMs(10000)
              .setRequestRetries(3)
              .setRequestTabletSize(1024 * 1024);
              Properties properties = new Properties();
              properties.setProperty("column_separator", ",");
              properties.setProperty("line_delimiter", "\n");
              properties.setProperty("format", "csv");
              DorisOptions.Builder dorisBuilder = DorisOptions.builder();
              dorisBuilder.setFenodes("127.0.0.1:8040")
              .setTableIdentifier("db.table")
              .setUsername("test")
              .setPassword("test");
              DorisExecutionOptions.Builder executionBuilder = DorisExecutionOptions.builder();
              executionBuilder.setLabelPrefix("label-doris")
              .setStreamLoadProp(properties)
              .setBufferSize(8*1024)
              .setBufferCount(3);


              builder.setDorisReadOptions(readOptionBuilder.build())
              .setDorisExecutionOptions(executionBuilder.build())
              .setSerializer(new SimpleStringSerializer())
              .setDorisOptions(dorisBuilder.build());


              List<Tuple2<String, Integer>> data = new ArrayList<>();
              data.add(new Tuple2<>("doris",1));
              DataStreamSource<Tuple2<String, Integer>> source = env.fromCollection(data);
              source.map((MapFunction<Tuple2<String, Integer>, String>) t -> t.f0 + "," + t.f1)
              .sinkTo(builder.build());
              env.execute("doris test");
              }
              }

              7

              DorisSinkExampleRowData

                package org.apache.doris.flink;


                import org.apache.doris.flink.cfg.DorisExecutionOptions;
                import org.apache.doris.flink.cfg.DorisOptions;
                import org.apache.doris.flink.sink.DorisSink;
                import org.apache.doris.flink.sink.writer.LoadConstants;
                import org.apache.doris.flink.sink.writer.RowDataSerializer;
                import org.apache.flink.api.common.RuntimeExecutionMode;
                import org.apache.flink.api.common.functions.FlatMapFunction;
                import org.apache.flink.api.common.restartstrategy.RestartStrategies;
                import org.apache.flink.api.common.time.Time;
                import org.apache.flink.streaming.api.TimeCharacteristic;
                import org.apache.flink.streaming.api.datastream.DataStream;
                import org.apache.flink.streaming.api.environment.CheckpointConfig;
                import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
                import org.apache.flink.table.api.DataTypes;
                import org.apache.flink.table.data.GenericRowData;
                import org.apache.flink.table.data.RowData;
                import org.apache.flink.table.data.StringData;
                import org.apache.flink.table.types.DataType;
                import org.apache.flink.util.Collector;


                import java.util.Properties;
                import java.util.UUID;




                public class DorisSinkExampleRowData {


                public static void main(String[] args) throws Exception{
                StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
                env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
                env.setRuntimeMode(RuntimeExecutionMode.BATCH);
                env.enableCheckpointing(10000);
                env.setParallelism(1);
                env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
                env.setRestartStrategy(RestartStrategies.fixedDelayRestart(5, Time.milliseconds(30000)));
                DorisSink.Builder<RowData> builder = DorisSink.builder();


                Properties properties = new Properties();
                properties.setProperty("column_separator", ",");
                properties.setProperty("line_delimiter", "\n");
                // properties.setProperty("read_json_by_line", "true");
                // properties.setProperty("format", "json");
                DorisOptions.Builder dorisBuilder = DorisOptions.builder();
                dorisBuilder.setFenodes("127.0.0.1:8030")
                .setTableIdentifier("db.tbl")
                .setUsername("root")
                .setPassword("");
                DorisExecutionOptions.Builder executionBuilder = DorisExecutionOptions.builder();
                executionBuilder.setLabelPrefix(UUID.randomUUID().toString())
                .setStreamLoadProp(properties);


                //flink rowdata‘s schema
                String[] fields = {"name", "age"};
                DataType[] types = {DataTypes.VARCHAR(256), DataTypes.INT()};


                builder.setDorisExecutionOptions(executionBuilder.build())
                .setSerializer(RowDataSerializer.builder() //serialize according to rowdata
                .setType(LoadConstants.CSV) //.setType(LoadConstants.CSV)
                .setFieldDelimiter(",")
                .setFieldNames(fields) //.setFieldDelimiter(",")
                .setFieldType(types).build())
                .setDorisOptions(dorisBuilder.build());


                //mock rowdata source
                DataStream<RowData> source = env.fromElements("")
                .flatMap(new FlatMapFunction<String, RowData>() {
                @Override
                public void flatMap(String s, Collector<RowData> out) throws Exception {
                GenericRowData genericRowData = new GenericRowData(2);
                genericRowData.setField(0, StringData.fromString("beijing"));
                genericRowData.setField(1, 123);
                out.collect(genericRowData);


                GenericRowData genericRowData2 = new GenericRowData(2);
                genericRowData2.setField(0, StringData.fromString("shanghai"));
                genericRowData2.setField(1, 1234);
                out.collect(genericRowData2);
                }
                });


                source.sinkTo(builder.build());
                env.execute("doris test");
                }
                }

                8

                DorisSinkSQLExample

                  package org.apache.doris.flink;


                  import org.apache.flink.api.common.RuntimeExecutionMode;
                  import org.apache.flink.api.java.tuple.Tuple2;
                  import org.apache.flink.streaming.api.datastream.DataStreamSource;
                  import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
                  import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;


                  import java.util.ArrayList;
                  import java.util.List;


                  import static org.apache.flink.table.api.Expressions.$;


                  public class DorisSinkSQLExample {


                  public static void main(String[] args) {
                  final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
                  env.setParallelism(1);
                  env.setRuntimeMode(RuntimeExecutionMode.BATCH);
                  final StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);


                  List<Tuple2<String, Integer>> data = new ArrayList<>();
                  data.add(new Tuple2<>("doris",1));
                  DataStreamSource<Tuple2<String, Integer>> source = env.fromCollection(data);
                  tEnv.createTemporaryView("doris_test",source,$("name"),$("age"));


                  tEnv.executeSql(
                  "CREATE TABLE doris_test_sink (" +
                  "name STRING," +
                  "age INT" +
                  ") " +
                  "WITH (\n" +
                  " 'connector' = 'doris',\n" +
                  " 'fenodes' = 'FE_IP:8030',\n" +
                  " 'table.identifier' = 'db.table',\n" +
                  " 'username' = 'root',\n" +
                  " 'password' = '',\n" +
                  " 'sink.properties.format' = 'json',\n" +
                  " 'sink.buffer-count' = '4',\n" +
                  " 'sink.buffer-size' = '4086'," +
                  " 'sink.label-prefix' = 'doris_label',\n" +
                  " 'sink.properties.read_json_by_line' = 'true'\n" +
                  ")");
                  tEnv.executeSql("INSERT INTO doris_test_sink select name,age from doris_test");
                  }
                  }

                  9

                  DorisSourceDataStream

                    package org.apache.doris.flink;


                    import org.apache.doris.flink.cfg.DorisStreamOptions;
                    import org.apache.doris.flink.datastream.DorisSourceFunction;
                    import org.apache.doris.flink.deserialization.SimpleListDeserializationSchema;
                    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;


                    import java.util.Properties;






                    public class DorisSourceDataStream {


                    public static void main(String[] args) throws Exception {
                    Properties properties = new Properties();
                    properties.put("fenodes","FE_IP:8030");
                    properties.put("username","root");
                    properties.put("password","");
                    properties.put("table.identifier","db.table");
                    properties.put("doris.read.field","id,code,name");
                    properties.put("doris.filter.query","name='doris'");
                    DorisStreamOptions options = new DorisStreamOptions(properties);


                    final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
                    env.setParallelism(2);
                    env.addSource(new DorisSourceFunction(options,new SimpleListDeserializationSchema())).print();
                    env.execute("Flink doris test");
                    }
                    }

                    10

                    DorisSourceExample

                      package org.apache.doris.flink;


                      import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
                      import org.apache.flink.table.api.Table;
                      import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;


                      public class DorisSourceExample {


                      public static void main(String[] args) throws Exception {


                      final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
                      env.setParallelism(1);


                      final StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);


                      // register a table in the catalog
                      tEnv.executeSql(
                      "CREATE TABLE doris_source (" +
                      "bigint_1 BIGINT," +
                      "char_1 STRING," +
                      "date_1 STRING," +
                      "datetime_1 STRING," +
                      "decimal_1 DECIMAL(5,2)," +
                      "double_1 DOUBLE," +
                      "float_1 FLOAT ," +
                      "int_1 INT ," +
                      "largeint_1 STRING, " +
                      "smallint_1 SMALLINT, " +
                      "tinyint_1 TINYINT, " +
                      "varchar_1 STRING " +
                      ") " +
                      "WITH (\n" +
                      " 'connector' = 'doris',\n" +
                      " 'fenodes' = 'FE_IP:8030',\n" +
                      " 'table.identifier' = 'db.table',\n" +
                      " 'username' = 'root',\n" +
                      " 'password' = ''\n" +
                      ")");


                      // define a dynamic aggregating query
                      final Table result = tEnv.sqlQuery("SELECT * from doris_source ");


                      // print the result to the console
                      tEnv.toDataStream(result).print();
                      env.execute();
                      }
                      }

                      11

                      DorisSourceSinkExample

                        package org.apache.doris.flink;


                        import org.apache.flink.api.common.RuntimeExecutionMode;
                        import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
                        import org.apache.flink.table.api.Table;
                        import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
                        import org.apache.flink.types.Row;


                        import java.util.UUID;


                        public class DorisSourceSinkExample {


                        public static void main(String[] args) throws Exception {
                        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
                        env.setParallelism(1);
                        env.setRuntimeMode(RuntimeExecutionMode.BATCH);
                        final StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
                        tEnv.executeSql(
                        "CREATE TABLE doris_test (" +
                        " id int,\n" +
                        " c_1 boolean,\n" +
                        " c_2 tinyint,\n" +
                        " c_3 smallint,\n" +
                        " c_4 int,\n" +
                        " c_5 bigint,\n" +
                        " c_6 bigint,\n" +
                        " c_7 float,\n" +
                        " c_8 double,\n" +
                        " c_9 DECIMAL(4,2),\n" +
                        " c_10 DECIMAL(4,1),\n" +
                        " c_11 date,\n" +
                        " c_12 date,\n" +
                        " c_13 timestamp,\n" +
                        " c_14 timestamp,\n" +
                        " c_15 string,\n" +
                        " c_16 string,\n" +
                        " c_17 string,\n" +
                        " c_18 array<int>,\n" +
                        " c_19 Map<String,int>\n" +
                        ") " +
                        "WITH (\n" +
                        " 'connector' = 'datagen', \n" +
                        " 'fields.c_6.max' = '5', \n" +
                        " 'fields.c_9.max' = '5', \n" +
                        " 'fields.c_10.max' = '5', \n" +
                        " 'fields.c_15.length' = '5', \n" +
                        " 'fields.c_16.length' = '5', \n" +
                        " 'fields.c_17.length' = '5', \n" +
                        " 'fields.c_19.key.length' = '5', \n" +
                        " 'connector' = 'datagen', \n" +
                        " 'number-of-rows' = '1' \n" +
                        ")");


                        final Table result = tEnv.sqlQuery("SELECT * from doris_test ");


                        // print the result to the console
                        tEnv.toRetractStream(result, Row.class).print();
                        env.execute();


                        tEnv.executeSql(
                        "CREATE TABLE source_doris (" +
                        " id int,\n" +
                        " c_1 boolean,\n" +
                        " c_2 tinyint,\n" +
                        " c_3 smallint,\n" +
                        " c_4 int,\n" +
                        " c_5 bigint,\n" +
                        " c_6 string,\n" +
                        " c_7 float,\n" +
                        " c_8 double,\n" +
                        " c_9 DECIMAL(4,2),\n" +
                        " c_10 DECIMAL(4,1),\n" +
                        " c_11 date,\n" +
                        " c_12 date,\n" +
                        " c_13 timestamp,\n" +
                        " c_14 timestamp,\n" +
                        " c_15 string,\n" +
                        " c_16 string,\n" +
                        " c_17 string,\n" +
                        " c_18 array<int>,\n" +
                        " c_19 string\n" +
                        ") " +
                        "WITH (\n" +
                        " 'connector' = 'doris',\n" +
                        " 'fenodes' = '127.0.0.1:8030',\n" +
                        " 'table.identifier' = 'test.test_all_type',\n" +
                        " 'username' = 'root',\n" +
                        " 'password' = ''\n" +
                        ")");


                        tEnv.executeSql(
                        "CREATE TABLE doris_test_sink (" +
                        " id int,\n" +
                        " c_1 boolean,\n" +
                        " c_2 tinyint,\n" +
                        " c_3 smallint,\n" +
                        " c_4 int,\n" +
                        " c_5 bigint,\n" +
                        " c_6 string,\n" +
                        " c_7 float,\n" +
                        " c_8 double,\n" +
                        " c_9 DECIMAL(4,2),\n" +
                        " c_10 DECIMAL(4,1),\n" +
                        " c_11 date,\n" +
                        " c_12 date,\n" +
                        " c_13 timestamp,\n" +
                        " c_14 timestamp,\n" +
                        " c_15 string,\n" +
                        " c_16 string,\n" +
                        " c_17 string,\n" +
                        " c_18 array<int>,\n" +
                        " c_19 string\n" +
                        ") " +
                        "WITH (\n" +
                        " 'connector' = 'doris',\n" +
                        " 'fenodes' = '127.0.0.1:8030',\n" +
                        " 'table.identifier' = 'test.test_all_type_sink',\n" +
                        " 'username' = 'root',\n" +
                        " 'password' = '',\n" +
                        " 'sink.properties.format' = 'csv',\n" +
                        " 'sink.label-prefix' = 'doris_label4" + UUID.randomUUID() + "'" +
                        ")");
                        //
                        tEnv.executeSql("INSERT INTO doris_test_sink select * from source_doris");
                        }
                        }

                        文章转载自大数据技能圈,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

                        评论