
点击蓝字 关注我们

Flink之DataSet数据输出(十一)

01
Flink Sink之TextFile
将DataSet数据以TextOutputFormat文本格式写入文件系统,其中文件系统可以是本地文件系统,也可以是HDFS文件系统,根据用户指定路径的前缀进行识别
dataSet.writeAsText(本地文件路径)dataSet.writeAsText(HDFS文件路径)
writeAsText直接将DataSet数据输出到指定文件,在使用write相关方式输出文件的过程中,用户也可以指定写入文件的模式,分为OVERWRITE模式和NOT_OVERWRITE模式,前者代表对文件内容进行覆盖写入,后者代表输出的数据将追加到文件尾部。
//Javaimport org.apache.flink.api.common.functions.FlatMapFunction;import org.apache.flink.api.common.functions.ReduceFunction;import org.apache.flink.api.java.DataSet;import org.apache.flink.api.java.ExecutionEnvironment;import org.apache.flink.api.java.io.TextOutputFormat;import org.apache.flink.api.java.operators.DataSink;import org.apache.flink.api.java.tuple.Tuple2;import org.apache.flink.util.Collector;public class TextSinkJavaDemo {public static void main(String[] args) throws Exception {ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();DataSet<String> source = env.fromElements("spark hbase java","java spark hive","java hbase hbase");env.setParallelism(1);DataSink<String> result = source.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {@Overridepublic void flatMap(String s, Collector<Tuple2<String, Integer>> collector) throws Exception {for(String word : s.split(" ")){collector.collect(new Tuple2<>(word,1));}}}).groupBy(0).reduce(new ReduceFunction<Tuple2<String, Integer>>() {@Overridepublic Tuple2<String, Integer> reduce(Tuple2<String, Integer> t1, Tuple2<String, Integer> t2) throws Exception {return new Tuple2<>(t1.f0,t1.f1+t2.f1);}})// .writeAsText("file:///Users/zhangjingyu/Desktop/wordcount.txt",FileSystem.WriteMode.OVERWRITE);.writeAsFormattedText("file:///Users/zhangjingyu/Desktop/wordcount.txt", new TextOutputFormat.TextFormatter<Tuple2<String, Integer>>() {@Overridepublic String format(Tuple2<String, Integer> stringIntegerTuple2) {return stringIntegerTuple2.f0+"-"+stringIntegerTuple2.f1;}});env.execute();}}//Scalaimport org.apache.flink.api.scala.{ExecutionEnvironment,_}import org.apache.flink.core.fs.FileSystem.WriteModeimport org.apache.flink.util.Collectorobject TextSinkScalaDemo {def main(args: Array[String]): Unit = {val env = ExecutionEnvironment.getExecutionEnvironmentval source = env.fromElements("spark hbase java","java spark hive","java hbase hbase")env.setParallelism(1)val result = source.flatMap((line:String,collector:Collector[(String,Int)]) => {(line.split(" ")).foreach(word => (collector.collect(word,1)))}).groupBy(0).reduceGroup(x => x reduce((x,y) => (x._1,x._2+y._2))).writeAsText("file:///Users/zhangjingyu/Desktop/wordcount.txt",WriteMode.OVERWRITE)env.execute()}}

02
Flink Sink之CsvFile
该方法将数据集以CSV文件格式输出到指定文件系统中,并且可以在输出方法中指定行切割符、列切割符等基于csv文件的配置。
dataSet.writeAsCsv(本地文件路径,指定行切割符,列切割符)例如:dataSet.writeAsCsv('file://path/fle","\n",",")
//Javaimport org.apache.flink.api.common.functions.FlatMapFunction;import org.apache.flink.api.common.functions.ReduceFunction;import org.apache.flink.api.java.DataSet;import org.apache.flink.api.java.ExecutionEnvironment;import org.apache.flink.api.java.operators.DataSink;import org.apache.flink.api.java.tuple.Tuple2;import org.apache.flink.core.fs.FileSystem;import org.apache.flink.util.Collector;public class CsvSinkJavaDemo {public static void main(String[] args) throws Exception{ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();DataSet<String> source = env.fromElements("spark hbase java","java spark hive","java hbase hbase");env.setParallelism(1);DataSink<Tuple2<String, Integer>> result = source.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {@Overridepublic void flatMap(String s, Collector<Tuple2<String, Integer>> collector) throws Exception {for(String word : s.split(" ")){collector.collect(new Tuple2<>(word,1));}}}).groupBy(0).reduce(new ReduceFunction<Tuple2<String, Integer>>() {@Overridepublic Tuple2<String, Integer> reduce(Tuple2<String, Integer> t1, Tuple2<String, Integer> t2) throws Exception {return new Tuple2<>(t1.f0,t1.f1+t2.f1);}}).writeAsCsv("file:///Users/zhangjingyu/Desktop/wordcount.csv","\n","|",FileSystem.WriteMode.OVERWRITE);env.execute();}}//Scalaimport org.apache.flink.api.scala.{ExecutionEnvironment,_}import org.apache.flink.core.fs.FileSystem.WriteModeimport org.apache.flink.util.Collectorobject CsvSinkScalaDemo {def main(args: Array[String]): Unit = {val env = ExecutionEnvironment.getExecutionEnvironmentval source = env.fromElements("spark hbase java","java spark hive","java hbase hbase")env.setParallelism(1)val result = source.flatMap((line:String,collector:Collector[(String,Int)]) => {(line.split(" ")).foreach(word => (collector.collect(word,1)))}).groupBy(0).reduceGroup(x => x reduce((x,y) => (x._1,x._2+y._2))).writeAsCsv("file:///Users/zhangjingyu/Desktop/wordcount.csv","\n","|",WriteMode.OVERWRITE)env.execute()}}

03
Flink Sink之Mysql
//Mysql配置参数Dataset.output(JDBCOutputFormat.buildJDBCOutputFormat().setDrivername("com.mysql.jdbc.Driver").setDBUrl("jdbc:mysql://bigdata-pro-m01.kfk.com/flink").setUsername("root").setPassword("12345678").setQuery("insert into wordcount (word, count) values (?,?)").finish());
//Javaimport org.apache.flink.api.common.functions.FlatMapFunction;import org.apache.flink.api.common.functions.MapFunction;import org.apache.flink.api.common.functions.ReduceFunction;import org.apache.flink.api.java.DataSet;import org.apache.flink.api.java.ExecutionEnvironment;import org.apache.flink.api.java.io.jdbc.JDBCOutputFormat;import org.apache.flink.api.java.tuple.Tuple2;import org.apache.flink.types.Row;import org.apache.flink.util.Collector;public class MysqlSinkJavaDemo {public static void main(String[] args) throws Exception {ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();DataSet<String> source = env.fromElements("spark hbase java","java spark hive","java hbase hbase");DataSet<Row> result = source.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {@Overridepublic void flatMap(String s, Collector<Tuple2<String, Integer>> collector) throws Exception {for(String word : s.toUpperCase().split(" ")){collector.collect(new Tuple2<>(word,1));}}}).groupBy(0).reduce(new ReduceFunction<Tuple2<String, Integer>>() {@Overridepublic Tuple2<String, Integer> reduce(Tuple2<String, Integer> t1, Tuple2<String, Integer> t2) throws Exception {return new Tuple2<>(t1.f0,t1.f1+t2.f1);}}).map(new MapFunction<Tuple2<String, Integer>, Row>() {@Overridepublic Row map(Tuple2<String, Integer> stringIntegerTuple2) throws Exception {return Row.of(stringIntegerTuple2.f0,stringIntegerTuple2.f1);}});result.output(JDBCOutputFormat.buildJDBCOutputFormat().setDrivername("com.mysql.jdbc.Driver").setDBUrl("jdbc:mysql://bigdata-pro-m01.kfk.com/flink").setUsername("root").setPassword("12345678").setQuery("insert overwrite wordcount (word, count) values (?,?)").finish());env.execute();}}


扫描二维码
关注我们
微信号 : BIGDT_IN
文章转载自数据信息化,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。




