细说Flink链接doris

Flink Doris Connector 可以支持通过 Flink 操作(读取、插入、修改、删除) Doris 中存储的数据。下面我们介绍如何通过Flink如果通过Datastream和SQL操作Doris。
修改和删除只支持在 Unique Key 模型上
目前的删除是支持 Flink CDC 的方式接入数据实现自动删除,如果是其他数据接入的方式删除需要自己实现。

01
Maven
✦
添加 flink-doris-connector
<!-- flink-doris-connector --><dependency><groupId>org.apache.doris</groupId><artifactId>flink-doris-connector-1.16</artifactId><version>1.4.0</version></dependency>
请根据不同的 Flink 版本替换对应的 Connector 和 Flink 依赖版本。
02
编译 ✦
编译时,可直接运行sh build.sh。
编译成功后,会在 dist 目录生成目标jar包,如:flink-doris-connector-1.5.0-SNAPSHOT.jar。将此文件复制到 Flink 的 classpath 中即可使用 Flink-Doris-Connector 。例如, Local 模式运行的 Flink ,将此文件放入 lib/ 文件夹下。Yarn 集群模式运行的 Flink ,则将此文件放入预部署包。
01
读取
✦
SQL
-- doris sourceCREATE TABLE flink_doris_source (name STRING,age INT,price DECIMAL(5,2),sale DOUBLE)WITH ('connector' = 'doris','fenodes' = 'FE_IP:HTTP_PORT','table.identifier' = 'database.table','username' = 'root','password' = 'password');
DataStream
DorisOptions.Builder builder = DorisOptions.builder().setFenodes("FE_IP:HTTP_PORT").setTableIdentifier("db.table").setUsername("root").setPassword("password");DorisSource<List<?>> dorisSource = DorisSourceBuilder.<List<?>>builder().setDorisOptions(builder.build()).setDorisReadOptions(DorisReadOptions.builder().build()).setDeserializer(new SimpleListDeserializationSchema()).build();env.fromSource(dorisSource, WatermarkStrategy.noWatermarks(), "doris source").print();
02
写入
✦
SQL
-- enable checkpointSET 'execution.checkpointing.interval' = '10s';-- doris sinkCREATE TABLE flink_doris_sink (name STRING,age INT,price DECIMAL(5,2),sale DOUBLE)WITH ('connector' = 'doris','fenodes' = 'FE_IP:HTTP_PORT','table.identifier' = 'db.table','username' = 'root','password' = 'password','sink.label-prefix' = 'doris_label');-- submit insert jobINSERT INTO flink_doris_sink select name,age,price,sale from flink_doris_source
DataStream
// enable checkpointenv.enableCheckpointing(10000);// using batch mode for bounded dataenv.setRuntimeMode(RuntimeExecutionMode.BATCH);DorisSink.Builder<String> builder = DorisSink.builder();DorisOptions.Builder dorisBuilder = DorisOptions.builder();dorisBuilder.setFenodes("FE_IP:HTTP_PORT").setTableIdentifier("db.table").setUsername("root").setPassword("password");DorisExecutionOptions.Builder executionBuilder = DorisExecutionOptions.builder();executionBuilder.setLabelPrefix("label-doris") //streamload label prefix.setDeletable(false);builder.setDorisReadOptions(DorisReadOptions.builder().build()).setDorisExecutionOptions(executionBuilder.build()).setSerializer(new SimpleStringSerializer()) //serialize according to string.setDorisOptions(dorisBuilder.build());//mock string sourceList<Tuple2<String, Integer>> data = new ArrayList<>();data.add(new Tuple2<>("doris",1));DataStreamSource<Tuple2<String, Integer>> source = env.fromCollection(data);source.map((MapFunction<Tuple2<String, Integer>, String>) t -> t.f0 + "\t" + t.f1).sinkTo(builder.build());
RowData 数据流(RowDataSerializer)
// enable checkpointenv.enableCheckpointing(10000);// using batch mode for bounded dataenv.setRuntimeMode(RuntimeExecutionMode.BATCH);//doris sink optionDorisSink.Builder<RowData> builder = DorisSink.builder();DorisOptions.Builder dorisBuilder = DorisOptions.builder();dorisBuilder.setFenodes("FE_IP:HTTP_PORT").setTableIdentifier("db.table").setUsername("root").setPassword("password");// json format to streamloadProperties properties = new Properties();properties.setProperty("format", "json");properties.setProperty("read_json_by_line", "true");DorisExecutionOptions.Builder executionBuilder = DorisExecutionOptions.builder();executionBuilder.setLabelPrefix("label-doris") //streamload label prefix.setDeletable(false).setStreamLoadProp(properties); //streamload params//flink rowdata‘s schemaString[] fields = {"city", "longitude", "latitude", "destroy_date"};DataType[] types = {DataTypes.VARCHAR(256), DataTypes.DOUBLE(), DataTypes.DOUBLE(), DataTypes.DATE()};builder.setDorisReadOptions(DorisReadOptions.builder().build()).setDorisExecutionOptions(executionBuilder.build()).setSerializer(RowDataSerializer.builder() //serialize according to rowdata.setFieldNames(fields).setType("json") //json format.setFieldType(types).build()).setDorisOptions(dorisBuilder.build());//mock rowdata sourceDataStream<RowData> source = env.fromElements("").map(new MapFunction<String, RowData>() {@Overridepublic RowData map(String value) throws Exception {GenericRowData genericRowData = new GenericRowData(4);genericRowData.setField(0, StringData.fromString("beijing"));genericRowData.setField(1, 116.405419);genericRowData.setField(2, 39.916927);genericRowData.setField(3, LocalDate.now().toEpochDay());return genericRowData;}});source.sinkTo(builder.build());
SchemaChange 数据流(JsonDebeziumSchemaSerializer)
// enable checkpointenv.enableCheckpointing(10000);Properties props = new Properties();props.setProperty("format", "json");props.setProperty("read_json_by_line", "true");DorisOptions dorisOptions = DorisOptions.builder().setFenodes("127.0.0.1:8030").setTableIdentifier("test.t1").setUsername("root").setPassword("").build();DorisExecutionOptions.Builder executionBuilder = DorisExecutionOptions.builder();executionBuilder.setLabelPrefix("label-prefix").setStreamLoadProp(props).setDeletable(true);DorisSink.Builder<String> builder = DorisSink.builder();builder.setDorisReadOptions(DorisReadOptions.builder().build()).setDorisExecutionOptions(executionBuilder.build()).setDorisOptions(dorisOptions).setSerializer(JsonDebeziumSchemaSerializer.builder().setDorisOptions(dorisOptions).build());env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "MySQL Source").sinkTo(builder.build());
动动手指

关注我们
文章转载自大数据技能圈,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。




