一.简介
Source 是Flink的输入,可以串行,并行,延迟,设置时间窗口等等。
Sink是Flink的输出,设置任意源。
Flink的编程模型就可以概况成接入Source,然后进行数据转换操作,再讲处理结果Sink出来。
二.SourceFunction
非并行
class CustomNonParallelSourceFunction extends SourceFunction[Long] {
var count = 0L
var isRunning = true
override def run(sourceContext: SourceFunction.SourceContext[Long]): Unit = {
if(isRunning){
sourceContext.collect(count)
count +=1
Thread.sleep(1000)
}
}
override def cancel(): Unit = {
isRunning = false
}
}
并行
class CustomParallelSourceFunction extends ParallelSourceFunction[Long] {
var isRunning = true
var count = 1L
override def run(sourceContext: SourceFunction.SourceContext[Long]): Unit = {
while (isRunning){
sourceContext.collect(count)
count += 1
Thread.sleep(1000)
}
}
override def cancel(): Unit = {
isRunning = false
}
}
富函数并行
//包含一些open close函数
class CustomRichParallelSourceFunction extends RichParallelSourceFunction[Long] {
var count = 1L
var isRunning = true
override def run(sourceContext: SourceFunction.SourceContext[Long]): Unit = {
while (isRunning){
sourceContext.collect(count)
count += 1
Thread.sleep(1000)
}
}
override def cancel(): Unit = {
isRunning = false
}
}
读取MySQL数据源
class MySQLRichParallelSourceFunction extends RichSourceFunction[SourceBean] {
var isRUNNING: Boolean = true
var ps: PreparedStatement = null
var conn: Connection = null
/**
* 建立连接
*/
def getConnection():Connection = {
var conn:Connection = null
val url:String = "jdbc:mysql://192.168.200.115:3306/kd_test?useSSL=false"
val user: String = "root"
val password: String = "root"
try {
Class.forName("com.mysql.jdbc.Driver")
conn = DriverManager.getConnection(url, user, password)
}catch {
case _: Throwable => println("due to the connect error then exit!")
}
conn
}
override def run(sourceContext: SourceFunction.SourceContext[SourceBean]): Unit = {
val resSet:ResultSet = ps.executeQuery()
while(isRUNNING & resSet.next()) {
sourceContext.collect(SourceBean(resSet.getString("id"),resSet.getString("name"),resSet.getInt("age"),resSet.getDate("ctime")))
}
}
override def cancel(): Unit = {
isRUNNING = false
}
override def open(parameters: Configuration): Unit = {
super.open(parameters)
conn = this.getConnection()
val sql = "select * from soure1"
ps = this.conn.prepareStatement(sql)
}
override def close(): Unit = {
if(conn != null){
conn.close()
}
if(ps != null){
ps.close()
}
}
}
三.SinkFunction
保存数据到MySQL中
class RichSinkFunctionToMySQL extends RichSinkFunction[SourceBean]{
var isRUNNING: Boolean = true
var ps: PreparedStatement = null
var conn: Connection = null
// 建立连接
def getConnection():Connection = {
var conn: Connection = null
val url: String = "jdbc:mysql://192.168.200.115:3306/kd_test?useSSL=false"
val user: String = "root"
val password: String = "root"
try{
Class.forName("com.mysql.jdbc.Driver")
conn = DriverManager.getConnection(url, user, password)
} catch {
case _: Throwable => println("due to the connect error then exit!")
}
conn
}
/**
* open()初始化建立和 MySQL 的连接
* @param parameters
*/
override def open(parameters: Configuration): Unit = {
super.open(parameters)
conn = getConnection()
val sql: String = "insert into soure1(id, name , age,ctime) values(?, ?, ?, ?);"
ps = this.conn.prepareStatement(sql)
}
/**
* 组装数据,进行数据的插入操作
* 对每条数据的插入都要调用invoke()方法
*
* @param value
*/
override def invoke(value: SourceBean): Unit = {
ps.setString(1, value.id)
ps.setString(2, value.name)
ps.setInt(3, value.age)
ps.setDate(4, new Date(value.ctime.getTime))
ps.executeUpdate()
}
override def close(): Unit = {
if (conn != null) {
conn.close()
}
if(ps != null) {
ps.close()
}
}
}
四.组合
综合上述的一些函数,复合一个编程模型,输入->转换->输出
object DataStreamSinkToMysqlApp {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
import org.apache.flink.api.scala._
val data = env.addSource(new MySQLRichParallelSourceFunction)
val stream = data.map(m=>{
val id = m.id +"1"
SourceBean(id,m.name,m.age,m.ctime)
})
stream.addSink(new RichSinkFunctionToMySQL)
env.execute("DataStreamSinkToMysqlApp")
}
}
文章转载自李孟的博客,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。




