暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

【大数据开发】Flink之DataSet数据输出(十一)

数据信息化 2020-06-26
725

点击蓝字 关注我们


Flink之DataSet数据输出(十一)

01

Flink Sink之TextFile

    将DataSet数据以TextOutputFormat文本格式写入文件系统,其中文件系统可以是本地文件系统,也可以是HDFS文件系统,根据用户指定路径的前缀进行识别

dataSet.writeAsText(本地文件路径)
dataSet.writeAsText(HDFS文件路径)


    writeAsText直接将DataSet数据输出到指定文件,在使用write相关方式输出文件的过程中,用户也可以指定写入文件的模式,分为OVERWRITE模式和NOT_OVERWRITE模式,前者代表对文件内容进行覆盖写入,后者代表输出的数据将追加到文件尾部。


//Java


import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.io.TextOutputFormat;
import org.apache.flink.api.java.operators.DataSink;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;


public class TextSinkJavaDemo {
public static void main(String[] args) throws Exception {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();


DataSet<String> source = env.fromElements(
"spark hbase java",
"java spark hive",
"java hbase hbase"
);


env.setParallelism(1);


DataSink<String> result = source.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
@Override
public void flatMap(String s, Collector<Tuple2<String, Integer>> collector) throws Exception {
for(String word : s.split(" ")){
collector.collect(new Tuple2<>(word,1));
}
}
})
.groupBy(0)
.reduce(new ReduceFunction<Tuple2<String, Integer>>() {
@Override
public Tuple2<String, Integer> reduce(Tuple2<String, Integer> t1, Tuple2<String, Integer> t2) throws Exception {
return new Tuple2<>(t1.f0,t1.f1+t2.f1);
}
})
// .writeAsText("file:///Users/zhangjingyu/Desktop/wordcount.txt",FileSystem.WriteMode.OVERWRITE);
.writeAsFormattedText("file:///Users/zhangjingyu/Desktop/wordcount.txt", new TextOutputFormat.TextFormatter<Tuple2<String, Integer>>() {
@Override
public String format(Tuple2<String, Integer> stringIntegerTuple2) {
return stringIntegerTuple2.f0+"-"+stringIntegerTuple2.f1;
}
});
env.execute();


}
}






//Scala


import org.apache.flink.api.scala.{ExecutionEnvironment,_}
import org.apache.flink.core.fs.FileSystem.WriteMode
import org.apache.flink.util.Collector


object TextSinkScalaDemo {
def main(args: Array[String]): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment


val source = env.fromElements(
"spark hbase java",
"java spark hive",
"java hbase hbase"
)
env.setParallelism(1)


val result = source
.flatMap((line:String,collector:Collector[(String,Int)]) => {
(line.split(" ")).foreach(word => (collector.collect(word,1)))
})
.groupBy(0)
.reduceGroup(x => x reduce((x,y) => (x._1,x._2+y._2)))
.writeAsText("file:///Users/zhangjingyu/Desktop/wordcount.txt",WriteMode.OVERWRITE)


env.execute()
}


}


02

Flink Sink之CsvFile

    该方法将数据集以CSV文件格式输出到指定文件系统中,并且可以在输出方法中指定行切割符、列切割符等基于csv文件的配置。

dataSet.writeAsCsv(本地文件路径,指定行切割符,列切割符)
例如:
dataSet.writeAsCsv('file://path/fle","\n",",")


//Java


import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.DataSink;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.util.Collector;


public class CsvSinkJavaDemo {
public static void main(String[] args) throws Exception{
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();


DataSet<String> source = env.fromElements(
"spark hbase java",
"java spark hive",
"java hbase hbase"
);


env.setParallelism(1);


DataSink<Tuple2<String, Integer>> result = source.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
@Override
public void flatMap(String s, Collector<Tuple2<String, Integer>> collector) throws Exception {
for(String word : s.split(" ")){
collector.collect(new Tuple2<>(word,1));
}
}
})
.groupBy(0)
.reduce(new ReduceFunction<Tuple2<String, Integer>>() {
@Override
public Tuple2<String, Integer> reduce(Tuple2<String, Integer> t1, Tuple2<String, Integer> t2) throws Exception {
return new Tuple2<>(t1.f0,t1.f1+t2.f1);
}
}).writeAsCsv("file:///Users/zhangjingyu/Desktop/wordcount.csv",
"\n",
"|",
FileSystem.WriteMode.OVERWRITE);
env.execute();
}
}




//Scala
import org.apache.flink.api.scala.{ExecutionEnvironment,_}
import org.apache.flink.core.fs.FileSystem.WriteMode
import org.apache.flink.util.Collector


object CsvSinkScalaDemo {
def main(args: Array[String]): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment


val source = env.fromElements(
"spark hbase java",
"java spark hive",
"java hbase hbase"
)
env.setParallelism(1)


val result = source
.flatMap((line:String,collector:Collector[(String,Int)]) => {
(line.split(" ")).foreach(word => (collector.collect(word,1)))
})
.groupBy(0)
.reduceGroup(x => x reduce((x,y) => (x._1,x._2+y._2)))
.writeAsCsv("file:///Users/zhangjingyu/Desktop/wordcount.csv",
"\n",
"|",
WriteMode.OVERWRITE)


env.execute()
}


}


03

Flink Sink之Mysql

//Mysql配置参数
Dataset.output(
JDBCOutputFormat.buildJDBCOutputFormat()
.setDrivername("com.mysql.jdbc.Driver")
.setDBUrl("jdbc:mysql://bigdata-pro-m01.kfk.com/flink")
.setUsername("root")
.setPassword("12345678")
.setQuery("insert into wordcount (word, count) values (?,?)")
.finish());
//Java


import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.io.jdbc.JDBCOutputFormat;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.types.Row;
import org.apache.flink.util.Collector;


public class MysqlSinkJavaDemo {
public static void main(String[] args) throws Exception {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<String> source = env.fromElements(
"spark hbase java",
"java spark hive",
"java hbase hbase"
);


DataSet<Row> result = source
.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
@Override
public void flatMap(String s, Collector<Tuple2<String, Integer>> collector) throws Exception {
for(String word : s.toUpperCase().split(" ")){
collector.collect(new Tuple2<>(word,1));
}
}
})
.groupBy(0)
.reduce(new ReduceFunction<Tuple2<String, Integer>>() {
@Override
public Tuple2<String, Integer> reduce(Tuple2<String, Integer> t1, Tuple2<String, Integer> t2) throws Exception {
return new Tuple2<>(t1.f0,t1.f1+t2.f1);
}
})
.map(new MapFunction<Tuple2<String, Integer>, Row>() {
@Override
public Row map(Tuple2<String, Integer> stringIntegerTuple2) throws Exception {
return Row.of(stringIntegerTuple2.f0,stringIntegerTuple2.f1);
}
});
result.output(
JDBCOutputFormat.buildJDBCOutputFormat()
.setDrivername("com.mysql.jdbc.Driver")
.setDBUrl("jdbc:mysql://bigdata-pro-m01.kfk.com/flink")
.setUsername("root")
.setPassword("12345678")
.setQuery("insert overwrite wordcount (word, count) values (?,?)")
.finish());
env.execute();
}
}

扫描二维码

关注我们

微信号 : BIGDT_IN 


文章转载自数据信息化,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论