Flink 连接 Doris

Flink可以通过各种方式链接doris,下面是我整理的10个demo代码,亲测有效,建议收藏~
package org.apache.doris.flink;import com.ververica.cdc.connectors.mysql.source.MySqlSource;import com.ververica.cdc.connectors.mysql.table.StartupOptions;import com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.json.JsonConverterConfig;import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;import org.apache.doris.flink.cfg.DorisExecutionOptions;import org.apache.doris.flink.cfg.DorisOptions;import org.apache.doris.flink.cfg.DorisReadOptions;import org.apache.doris.flink.sink.DorisSink;import org.apache.doris.flink.sink.writer.JsonDebeziumSchemaSerializer;import org.apache.doris.flink.utils.DateToStringConverter;import org.apache.flink.api.common.eventtime.WatermarkStrategy;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import java.util.HashMap;import java.util.Map;import java.util.Properties;import java.util.UUID;public class CDCSchemaChangeExample {public static void main(String[] args) throws Exception {Map<String, Object> customConverterConfigs = new HashMap<>();customConverterConfigs.put(JsonConverterConfig.DECIMAL_FORMAT_CONFIG, "numeric");JsonDebeziumDeserializationSchema schema =new JsonDebeziumDeserializationSchema(false, customConverterConfigs);MySqlSource<String> mySqlSource = MySqlSource.<String>builder().hostname("127.0.0.1").port(3306).databaseList("test") // set captured database.tableList("test.t1") // set captured table.username("root").password("123456").debeziumProperties(DateToStringConverter.DEFAULT_PROPS).deserializer(schema).serverTimeZone("Asia/Shanghai").includeSchemaChanges(true) // converts SourceRecord to JSON String.build();StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);// enable checkpointenv.enableCheckpointing(10000);//Properties props = new Properties();props.setProperty("format", "json");props.setProperty("read_json_by_line", "true");DorisOptions dorisOptions = DorisOptions.builder().setFenodes("127.0.0.1:8030").setTableIdentifier("test.t1").setUsername("root").setPassword("").build();//DorisExecutionOptions.Builder executionBuilder = DorisExecutionOptions.builder();executionBuilder.setLabelPrefix("label-doris" + UUID.randomUUID()).setStreamLoadProp(props).setDeletable(true);DorisSink.Builder<String> builder = DorisSink.builder();builder.setDorisReadOptions(DorisReadOptions.builder().build()).setDorisExecutionOptions(executionBuilder.build()).setDorisOptions(dorisOptions).setSerializer(JsonDebeziumSchemaSerializer.builder().setDorisOptions(dorisOptions).setNewSchemaChange(true).build());env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "MySQL Source")//.print();.sinkTo(builder.build());env.execute("Print MySQL Snapshot + Binlog");}}
package org.apache.doris.flink;import org.apache.flink.table.api.EnvironmentSettings;import org.apache.flink.table.api.TableEnvironment;import java.util.UUID;public class DorisDateAndTimestampSqlTest {public static void main(String[] args) {TableEnvironment tEnv = TableEnvironment.create(EnvironmentSettings.newInstance().inBatchMode().build());tEnv.executeSql("create table test_source ( " +" id INT, " +" score DECIMAL(10, 9), " +" submit_time TIMESTAMP " +" ) with ( " +" 'password'='', " +" 'connector'='doris', " +" 'fenodes'='FE_HOST:FE_PORT', " +" 'table.identifier'='db.source_table', " +" 'username'='root' " +")");tEnv.executeSql("create table test_sink ( " +" id INT, " +" score DECIMAL(10, 9), " +" submit_time DATE " +" ) with ( " +" 'password'='', " +" 'connector'='doris', " +" 'fenodes'='FE_HOST:FE_PORT', " +" 'sink.label-prefix' = 'label_" + UUID.randomUUID()+"' , " +" 'table.identifier'='db.sink_table', " +" 'username'='root' " +")");tEnv.executeSql("insert into " +" test_sink " +"select " +" id, " +" score," +" to_date(DATE_FORMAT(submit_time, 'yyyy-MM-dd')) as submit_time " +"from " +" test_source " +"where " +" submit_time>='2022-05-31 00:00:00'").print();}}
public class DorisIntranetAccessSinkExample {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);env.setRuntimeMode(RuntimeExecutionMode.BATCH);env.enableCheckpointing(10000);env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);env.setRestartStrategy(RestartStrategies.fixedDelayRestart(5, Time.milliseconds(30000)));DorisSink.Builder<String> builder = DorisSink.builder();final DorisReadOptions.Builder readOptionBuilder = DorisReadOptions.builder();readOptionBuilder.setDeserializeArrowAsync(false).setDeserializeQueueSize(64).setExecMemLimit(2147483648L).setRequestQueryTimeoutS(3600).setRequestBatchSize(1000).setRequestConnectTimeoutMs(10000).setRequestReadTimeoutMs(10000).setRequestRetries(3).setRequestTabletSize(1024 * 1024);Properties properties = new Properties();properties.setProperty("column_separator", ",");properties.setProperty("line_delimiter", "\n");properties.setProperty("format", "csv");DorisOptions.Builder dorisBuilder = DorisOptions.builder();dorisBuilder.setFenodes("10.20.30.1:8030").setBenodes("10.20.30.1:8040, 10.20.30.2:8040, 10.20.30.3:8040").setTableIdentifier("test.test_sink").setUsername("root").setPassword("");DorisExecutionOptions.Builder executionBuilder = DorisExecutionOptions.builder();executionBuilder.disable2PC().setLabelPrefix("label-doris").setStreamLoadProp(properties).setBufferSize(8 * 1024).setBufferCount(3);builder.setDorisReadOptions(readOptionBuilder.build()).setDorisExecutionOptions(executionBuilder.build()).setSerializer(new SimpleStringSerializer()).setDorisOptions(dorisBuilder.build());List<Tuple2<Integer, String>> data = new ArrayList<>();data.add(new Tuple2<>(1, "zhangsan"));data.add(new Tuple2<>(2, "lisi"));data.add(new Tuple2<>(3, "wangwu"));DataStreamSource<Tuple2<Integer, String>> source = env.fromCollection(data);source.map((MapFunction<Tuple2<Integer, String>, String>) t -> t.f0 + "," + t.f1).sinkTo(builder.build());env.execute("doris test");}}
package org.apache.doris.flink;import org.apache.flink.api.common.RuntimeExecutionMode;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;import java.util.UUID;public class DorisSinkArraySQLExample {public static void main(String[] args) throws Exception{final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);env.setRuntimeMode(RuntimeExecutionMode.BATCH);final StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);tEnv.executeSql("CREATE TABLE source (\n" +" `id` int,\n" +" `c_1` array<INT> ,\n" +" `c_2` array<TINYINT> ,\n" +" `c_3` array<SMALLINT> ,\n" +" `c_4` array<INT> ,\n" +" `c_5` array<BIGINT> ,\n" +" `c_6` array<BIGINT> ,\n" +" `c_7` array<FLOAT>,\n" +" `c_8` array<DOUBLE> ,\n" +" `c_9` array<DECIMAL(4,2)> ,\n" +" `c_10` array<DATE> ,\n" +" `c_11` array<DATE> ,\n" +" `c_12` array<TIMESTAMP> ,\n" +" `c_13` array<TIMESTAMP> ,\n" +" `c_14` array<CHAR(10)> ,\n" +" `c_15` array<VARCHAR(256)> ,\n" +" `c_16` array<STRING> \n" +") WITH (\n" +" 'connector' = 'datagen', \n" +" 'fields.c_7.element.min' = '1', \n" +" 'fields.c_7.element.max' = '10', \n" +" 'fields.c_8.element.min' = '1', \n" +" 'fields.c_8.element.max' = '10', \n" +" 'fields.c_14.element.length' = '10', \n" +" 'fields.c_15.element.length' = '10', \n" +" 'fields.c_16.element.length' = '10', \n" +" 'number-of-rows' = '5' \n" +");");tEnv.executeSql("CREATE TABLE source_doris (" +" `id` int,\n" +" `c_1` array<INT> ,\n" +" `c_2` array<TINYINT> ,\n" +" `c_3` array<SMALLINT> ,\n" +" `c_4` array<INT> ,\n" +" `c_5` array<BIGINT> ,\n" +" `c_6` array<STRING> ,\n" +" `c_7` array<FLOAT> ,\n" +" `c_8` array<DOUBLE> ,\n" +" `c_9` array<DECIMAL(4,2)> ,\n" +" `c_10` array<STRING> ,\n" + //ARRAY<DATE>" `c_11` array<STRING> ,\n" + //ARRAY<DATE>" `c_12` array<STRING> ,\n" + //ARRAY<TIMESTAMP>" `c_13` array<STRING> ,\n" + //ARRAY<TIMESTAMP>" `c_14` array<CHAR(10)> ,\n" +" `c_15` array<VARCHAR(256)> ,\n" +" `c_16` array<STRING> \n" +") WITH (" +" 'connector' = 'doris',\n" +" 'fenodes' = '127.0.0.1:8030',\n" +" 'table.identifier' = 'test.array_test_type',\n" +" 'username' = 'root',\n" +" 'password' = ''\n" +")");// define a dynamic aggregating query// final Table result = tEnv.sqlQuery("SELECT * from source_doris ");//// print the result to the console// tEnv.toRetractStream(result, Row.class).print();// env.execute();tEnv.executeSql("CREATE TABLE sink (" +" `id` int,\n" +" `c_1` array<INT> ,\n" +" `c_2` array<TINYINT> ,\n" +" `c_3` array<SMALLINT> ,\n" +" `c_4` array<INT> ,\n" +" `c_5` array<BIGINT> ,\n" +" `c_6` array<STRING> ,\n" +" `c_7` array<FLOAT> ,\n" +" `c_8` array<DOUBLE> ,\n" +" `c_9` array<DECIMAL(4,2)> ,\n" +" `c_10` array<STRING> ,\n" + //ARRAY<DATE>" `c_11` array<STRING> ,\n" + //ARRAY<DATE>" `c_12` array<STRING> ,\n" + //ARRAY<TIMESTAMP>" `c_13` array<STRING> ,\n" + //ARRAY<TIMESTAMP>" `c_14` array<CHAR(10)> ,\n" +" `c_15` array<VARCHAR(256)> ,\n" +" `c_16` array<STRING> \n" +") " +"WITH (\n" +" 'connector' = 'doris',\n" +" 'fenodes' = '127.0.0.1:8030',\n" +" 'table.identifier' = 'test.array_test_type_sink',\n" +" 'username' = 'root',\n" +" 'password' = '',\n" +" 'sink.label-prefix' = 'doris_label4" + UUID.randomUUID() + "'" +")");tEnv.executeSql("INSERT INTO sink select * from source_doris");}}
package org.apache.doris.flink;import org.apache.doris.flink.cfg.DorisExecutionOptions;import org.apache.doris.flink.cfg.DorisOptions;import org.apache.doris.flink.cfg.DorisReadOptions;import org.apache.doris.flink.sink.batch.DorisBatchSink;import org.apache.doris.flink.sink.writer.SimpleStringSerializer;import org.apache.flink.streaming.api.datastream.DataStreamSource;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.streaming.api.functions.source.SourceFunction;import java.util.Arrays;import java.util.Properties;import java.util.UUID;public class DorisSinkBatchExample {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);// env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);env.enableCheckpointing(5000);// env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);// env.setRestartStrategy(RestartStrategies.fixedDelayRestart(5, Time.milliseconds(30000)));DorisBatchSink.Builder<String> builder = DorisBatchSink.builder();final DorisReadOptions.Builder readOptionBuilder = DorisReadOptions.builder();readOptionBuilder.setDeserializeArrowAsync(false).setDeserializeQueueSize(64).setExecMemLimit(2147483648L).setRequestQueryTimeoutS(3600).setRequestBatchSize(1000).setRequestConnectTimeoutMs(10000).setRequestReadTimeoutMs(10000).setRequestRetries(3).setRequestTabletSize(1024 * 1024);Properties properties = new Properties();properties.setProperty("column_separator", ",");properties.setProperty("line_delimiter", "\n");properties.setProperty("format", "csv");DorisOptions.Builder dorisBuilder = DorisOptions.builder();dorisBuilder.setFenodes("127.0.0.1:8030").setTableIdentifier("test.test_flink").setUsername("root").setPassword("");DorisExecutionOptions.Builder executionBuilder = DorisExecutionOptions.builder();executionBuilder.setLabelPrefix("label").setStreamLoadProp(properties).setDeletable(false).setBufferFlushMaxBytes(8 * 1024).setBufferFlushMaxRows(900).setBufferFlushIntervalMs(1000 * 10);builder.setDorisReadOptions(readOptionBuilder.build()).setDorisExecutionOptions(executionBuilder.build()).setSerializer(new SimpleStringSerializer()).setDorisOptions(dorisBuilder.build());env.addSource(new SourceFunction<String>() {private Long id = 0L;@Overridepublic void run(SourceContext<String> out) throws Exception {while (true) {id = id + 1;String record = id + "," + UUID.randomUUID() + "," + id + "";out.collect(record);Thread.sleep(500);}}@Overridepublic void cancel() {}}).sinkTo(builder.build());env.execute("doris batch test");}public void testBatchFlush() throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);DorisBatchSink.Builder<String> builder = DorisBatchSink.builder();final DorisReadOptions.Builder readOptionBuilder = DorisReadOptions.builder();readOptionBuilder.setDeserializeArrowAsync(false).setDeserializeQueueSize(64).setExecMemLimit(2147483648L).setRequestQueryTimeoutS(3600).setRequestBatchSize(1000).setRequestConnectTimeoutMs(10000).setRequestReadTimeoutMs(10000).setRequestRetries(3).setRequestTabletSize(1024 * 1024);Properties properties = new Properties();properties.setProperty("column_separator", ",");properties.setProperty("line_delimiter", "\n");properties.setProperty("format", "csv");DorisOptions.Builder dorisBuilder = DorisOptions.builder();dorisBuilder.setFenodes("127.0.0.1:8030").setTableIdentifier("test.testd").setUsername("root").setPassword("");DorisExecutionOptions.Builder executionBuilder = DorisExecutionOptions.builder();executionBuilder.setLabelPrefix("label").setStreamLoadProp(properties).setDeletable(false).setBufferFlushMaxBytes(8 * 1024).setBufferFlushMaxRows(1).setBufferFlushIntervalMs(1000 * 10);builder.setDorisReadOptions(readOptionBuilder.build()).setDorisExecutionOptions(executionBuilder.build()).setSerializer(new SimpleStringSerializer()).setDorisOptions(dorisBuilder.build());DataStreamSource<String> stringDataStreamSource = env.fromCollection(Arrays.asList("1,-74159.9193252453", "2,-74159.9193252453", "3,-19.7004480979", "4,43385.2170333507", "5,-16.2602598554"));stringDataStreamSource.sinkTo(builder.build());env.execute("doris batch test");}}
package org.apache.doris.flink;import org.apache.doris.flink.cfg.DorisExecutionOptions;import org.apache.doris.flink.cfg.DorisOptions;import org.apache.doris.flink.cfg.DorisReadOptions;import org.apache.doris.flink.sink.DorisSink;import org.apache.doris.flink.sink.writer.SimpleStringSerializer;import org.apache.flink.api.common.RuntimeExecutionMode;import org.apache.flink.api.common.functions.MapFunction;import org.apache.flink.api.common.restartstrategy.RestartStrategies;import org.apache.flink.api.common.time.Time;import org.apache.flink.api.java.tuple.Tuple2;import org.apache.flink.streaming.api.TimeCharacteristic;import org.apache.flink.streaming.api.datastream.DataStreamSource;import org.apache.flink.streaming.api.environment.CheckpointConfig;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import java.util.ArrayList;import java.util.List;import java.util.Properties;public class DorisSinkExample {public static void main(String[] args) throws Exception{StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);env.setRuntimeMode(RuntimeExecutionMode.BATCH);env.enableCheckpointing(10000);env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);env.setRestartStrategy(RestartStrategies.fixedDelayRestart(5, Time.milliseconds(30000)));DorisSink.Builder<String> builder = DorisSink.builder();final DorisReadOptions.Builder readOptionBuilder = DorisReadOptions.builder();readOptionBuilder.setDeserializeArrowAsync(false).setDeserializeQueueSize(64).setExecMemLimit(2147483648L).setRequestQueryTimeoutS(3600).setRequestBatchSize(1000).setRequestConnectTimeoutMs(10000).setRequestReadTimeoutMs(10000).setRequestRetries(3).setRequestTabletSize(1024 * 1024);Properties properties = new Properties();properties.setProperty("column_separator", ",");properties.setProperty("line_delimiter", "\n");properties.setProperty("format", "csv");DorisOptions.Builder dorisBuilder = DorisOptions.builder();dorisBuilder.setFenodes("127.0.0.1:8040").setTableIdentifier("db.table").setUsername("test").setPassword("test");DorisExecutionOptions.Builder executionBuilder = DorisExecutionOptions.builder();executionBuilder.setLabelPrefix("label-doris").setStreamLoadProp(properties).setBufferSize(8*1024).setBufferCount(3);builder.setDorisReadOptions(readOptionBuilder.build()).setDorisExecutionOptions(executionBuilder.build()).setSerializer(new SimpleStringSerializer()).setDorisOptions(dorisBuilder.build());List<Tuple2<String, Integer>> data = new ArrayList<>();data.add(new Tuple2<>("doris",1));DataStreamSource<Tuple2<String, Integer>> source = env.fromCollection(data);source.map((MapFunction<Tuple2<String, Integer>, String>) t -> t.f0 + "," + t.f1).sinkTo(builder.build());env.execute("doris test");}}
package org.apache.doris.flink;import org.apache.doris.flink.cfg.DorisExecutionOptions;import org.apache.doris.flink.cfg.DorisOptions;import org.apache.doris.flink.sink.DorisSink;import org.apache.doris.flink.sink.writer.LoadConstants;import org.apache.doris.flink.sink.writer.RowDataSerializer;import org.apache.flink.api.common.RuntimeExecutionMode;import org.apache.flink.api.common.functions.FlatMapFunction;import org.apache.flink.api.common.restartstrategy.RestartStrategies;import org.apache.flink.api.common.time.Time;import org.apache.flink.streaming.api.TimeCharacteristic;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.environment.CheckpointConfig;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.table.api.DataTypes;import org.apache.flink.table.data.GenericRowData;import org.apache.flink.table.data.RowData;import org.apache.flink.table.data.StringData;import org.apache.flink.table.types.DataType;import org.apache.flink.util.Collector;import java.util.Properties;import java.util.UUID;public class DorisSinkExampleRowData {public static void main(String[] args) throws Exception{StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);env.setRuntimeMode(RuntimeExecutionMode.BATCH);env.enableCheckpointing(10000);env.setParallelism(1);env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);env.setRestartStrategy(RestartStrategies.fixedDelayRestart(5, Time.milliseconds(30000)));DorisSink.Builder<RowData> builder = DorisSink.builder();Properties properties = new Properties();properties.setProperty("column_separator", ",");properties.setProperty("line_delimiter", "\n");// properties.setProperty("read_json_by_line", "true");// properties.setProperty("format", "json");DorisOptions.Builder dorisBuilder = DorisOptions.builder();dorisBuilder.setFenodes("127.0.0.1:8030").setTableIdentifier("db.tbl").setUsername("root").setPassword("");DorisExecutionOptions.Builder executionBuilder = DorisExecutionOptions.builder();executionBuilder.setLabelPrefix(UUID.randomUUID().toString()).setStreamLoadProp(properties);//flink rowdata‘s schemaString[] fields = {"name", "age"};DataType[] types = {DataTypes.VARCHAR(256), DataTypes.INT()};builder.setDorisExecutionOptions(executionBuilder.build()).setSerializer(RowDataSerializer.builder() //serialize according to rowdata.setType(LoadConstants.CSV) //.setType(LoadConstants.CSV).setFieldDelimiter(",").setFieldNames(fields) //.setFieldDelimiter(",").setFieldType(types).build()).setDorisOptions(dorisBuilder.build());//mock rowdata sourceDataStream<RowData> source = env.fromElements("").flatMap(new FlatMapFunction<String, RowData>() {@Overridepublic void flatMap(String s, Collector<RowData> out) throws Exception {GenericRowData genericRowData = new GenericRowData(2);genericRowData.setField(0, StringData.fromString("beijing"));genericRowData.setField(1, 123);out.collect(genericRowData);GenericRowData genericRowData2 = new GenericRowData(2);genericRowData2.setField(0, StringData.fromString("shanghai"));genericRowData2.setField(1, 1234);out.collect(genericRowData2);}});source.sinkTo(builder.build());env.execute("doris test");}}
package org.apache.doris.flink;import org.apache.flink.api.common.RuntimeExecutionMode;import org.apache.flink.api.java.tuple.Tuple2;import org.apache.flink.streaming.api.datastream.DataStreamSource;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;import java.util.ArrayList;import java.util.List;import static org.apache.flink.table.api.Expressions.$;public class DorisSinkSQLExample {public static void main(String[] args) {final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);env.setRuntimeMode(RuntimeExecutionMode.BATCH);final StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);List<Tuple2<String, Integer>> data = new ArrayList<>();data.add(new Tuple2<>("doris",1));DataStreamSource<Tuple2<String, Integer>> source = env.fromCollection(data);tEnv.createTemporaryView("doris_test",source,$("name"),$("age"));tEnv.executeSql("CREATE TABLE doris_test_sink (" +"name STRING," +"age INT" +") " +"WITH (\n" +" 'connector' = 'doris',\n" +" 'fenodes' = 'FE_IP:8030',\n" +" 'table.identifier' = 'db.table',\n" +" 'username' = 'root',\n" +" 'password' = '',\n" +" 'sink.properties.format' = 'json',\n" +" 'sink.buffer-count' = '4',\n" +" 'sink.buffer-size' = '4086'," +" 'sink.label-prefix' = 'doris_label',\n" +" 'sink.properties.read_json_by_line' = 'true'\n" +")");tEnv.executeSql("INSERT INTO doris_test_sink select name,age from doris_test");}}
package org.apache.doris.flink;import org.apache.doris.flink.cfg.DorisStreamOptions;import org.apache.doris.flink.datastream.DorisSourceFunction;import org.apache.doris.flink.deserialization.SimpleListDeserializationSchema;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import java.util.Properties;public class DorisSourceDataStream {public static void main(String[] args) throws Exception {Properties properties = new Properties();properties.put("fenodes","FE_IP:8030");properties.put("username","root");properties.put("password","");properties.put("table.identifier","db.table");properties.put("doris.read.field","id,code,name");properties.put("doris.filter.query","name='doris'");DorisStreamOptions options = new DorisStreamOptions(properties);final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(2);env.addSource(new DorisSourceFunction(options,new SimpleListDeserializationSchema())).print();env.execute("Flink doris test");}}
package org.apache.doris.flink;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.table.api.Table;import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;public class DorisSourceExample {public static void main(String[] args) throws Exception {final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);final StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);// register a table in the catalogtEnv.executeSql("CREATE TABLE doris_source (" +"bigint_1 BIGINT," +"char_1 STRING," +"date_1 STRING," +"datetime_1 STRING," +"decimal_1 DECIMAL(5,2)," +"double_1 DOUBLE," +"float_1 FLOAT ," +"int_1 INT ," +"largeint_1 STRING, " +"smallint_1 SMALLINT, " +"tinyint_1 TINYINT, " +"varchar_1 STRING " +") " +"WITH (\n" +" 'connector' = 'doris',\n" +" 'fenodes' = 'FE_IP:8030',\n" +" 'table.identifier' = 'db.table',\n" +" 'username' = 'root',\n" +" 'password' = ''\n" +")");// define a dynamic aggregating queryfinal Table result = tEnv.sqlQuery("SELECT * from doris_source ");// print the result to the consoletEnv.toDataStream(result).print();env.execute();}}
package org.apache.doris.flink;import org.apache.flink.api.common.RuntimeExecutionMode;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.table.api.Table;import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;import org.apache.flink.types.Row;import java.util.UUID;public class DorisSourceSinkExample {public static void main(String[] args) throws Exception {final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);env.setRuntimeMode(RuntimeExecutionMode.BATCH);final StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);tEnv.executeSql("CREATE TABLE doris_test (" +" id int,\n" +" c_1 boolean,\n" +" c_2 tinyint,\n" +" c_3 smallint,\n" +" c_4 int,\n" +" c_5 bigint,\n" +" c_6 bigint,\n" +" c_7 float,\n" +" c_8 double,\n" +" c_9 DECIMAL(4,2),\n" +" c_10 DECIMAL(4,1),\n" +" c_11 date,\n" +" c_12 date,\n" +" c_13 timestamp,\n" +" c_14 timestamp,\n" +" c_15 string,\n" +" c_16 string,\n" +" c_17 string,\n" +" c_18 array<int>,\n" +" c_19 Map<String,int>\n" +") " +"WITH (\n" +" 'connector' = 'datagen', \n" +" 'fields.c_6.max' = '5', \n" +" 'fields.c_9.max' = '5', \n" +" 'fields.c_10.max' = '5', \n" +" 'fields.c_15.length' = '5', \n" +" 'fields.c_16.length' = '5', \n" +" 'fields.c_17.length' = '5', \n" +" 'fields.c_19.key.length' = '5', \n" +" 'connector' = 'datagen', \n" +" 'number-of-rows' = '1' \n" +")");final Table result = tEnv.sqlQuery("SELECT * from doris_test ");// print the result to the consoletEnv.toRetractStream(result, Row.class).print();env.execute();tEnv.executeSql("CREATE TABLE source_doris (" +" id int,\n" +" c_1 boolean,\n" +" c_2 tinyint,\n" +" c_3 smallint,\n" +" c_4 int,\n" +" c_5 bigint,\n" +" c_6 string,\n" +" c_7 float,\n" +" c_8 double,\n" +" c_9 DECIMAL(4,2),\n" +" c_10 DECIMAL(4,1),\n" +" c_11 date,\n" +" c_12 date,\n" +" c_13 timestamp,\n" +" c_14 timestamp,\n" +" c_15 string,\n" +" c_16 string,\n" +" c_17 string,\n" +" c_18 array<int>,\n" +" c_19 string\n" +") " +"WITH (\n" +" 'connector' = 'doris',\n" +" 'fenodes' = '127.0.0.1:8030',\n" +" 'table.identifier' = 'test.test_all_type',\n" +" 'username' = 'root',\n" +" 'password' = ''\n" +")");tEnv.executeSql("CREATE TABLE doris_test_sink (" +" id int,\n" +" c_1 boolean,\n" +" c_2 tinyint,\n" +" c_3 smallint,\n" +" c_4 int,\n" +" c_5 bigint,\n" +" c_6 string,\n" +" c_7 float,\n" +" c_8 double,\n" +" c_9 DECIMAL(4,2),\n" +" c_10 DECIMAL(4,1),\n" +" c_11 date,\n" +" c_12 date,\n" +" c_13 timestamp,\n" +" c_14 timestamp,\n" +" c_15 string,\n" +" c_16 string,\n" +" c_17 string,\n" +" c_18 array<int>,\n" +" c_19 string\n" +") " +"WITH (\n" +" 'connector' = 'doris',\n" +" 'fenodes' = '127.0.0.1:8030',\n" +" 'table.identifier' = 'test.test_all_type_sink',\n" +" 'username' = 'root',\n" +" 'password' = '',\n" +" 'sink.properties.format' = 'csv',\n" +" 'sink.label-prefix' = 'doris_label4" + UUID.randomUUID() + "'" +")");//tEnv.executeSql("INSERT INTO doris_test_sink select * from source_doris");}}
文章转载自大数据技能圈,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。




