暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

Flink SourceFunction/SinkFunction

李孟的博客 2020-10-21
811

一.简介

Source 是Flink的输入,可以串行,并行,延迟,设置时间窗口等等。

Sink是Flink的输出,设置任意源。

Flink的编程模型就可以概况成接入Source,然后进行数据转换操作,再讲处理结果Sink出来。

二.SourceFunction

非并行

class CustomNonParallelSourceFunction extends SourceFunction[Long{
  var count = 0L
  var isRunning = true
  override def run(sourceContext: SourceFunction.SourceContext[Long]): Unit = {
    if(isRunning){
      sourceContext.collect(count)
      count +=1
      Thread.sleep(1000)
    }
  }
  override def cancel(): Unit = {
    isRunning = false
  }
}

并行

class CustomParallelSourceFunction extends ParallelSourceFunction[Long{
  var isRunning = true
  var count = 1L
  override def run(sourceContext: SourceFunction.SourceContext[Long]): Unit = {
    while (isRunning){
      sourceContext.collect(count)
      count += 1
      Thread.sleep(1000)
    }
  }
  override def cancel(): Unit = {
    isRunning = false
  }
}

富函数并行

//包含一些open close函数
class CustomRichParallelSourceFunction  extends RichParallelSourceFunction[Long{
  var count = 1L
  var isRunning = true
  override def run(sourceContext: SourceFunction.SourceContext[Long]): Unit = {
    while (isRunning){
      sourceContext.collect(count)
      count += 1
      Thread.sleep(1000)
    }
  }
  override def cancel(): Unit = {
    isRunning = false
  }
}

读取MySQL数据源

class MySQLRichParallelSourceFunction extends RichSourceFunction[SourceBean{
  var isRUNNING: Boolean = true
  var ps: PreparedStatement = null
  var conn: Connection = null
  /**
   * 建立连接
   */

  def getConnection():Connection = {
    var conn:Connection = null
    val url:String = "jdbc:mysql://192.168.200.115:3306/kd_test?useSSL=false"
    val user: String = "root"
    val password: String = "root"
    try {
      Class.forName("com.mysql.jdbc.Driver")
      conn = DriverManager.getConnection(url, user, password)
    }catch {
      case _: Throwable => println("due to the connect error then exit!")
    }
    conn
  }

  override def run(sourceContext: SourceFunction.SourceContext[SourceBean]): Unit = {
    val resSet:ResultSet = ps.executeQuery()
    while(isRUNNING & resSet.next()) {
      sourceContext.collect(SourceBean(resSet.getString("id"),resSet.getString("name"),resSet.getInt("age"),resSet.getDate("ctime")))
    }
  }
  override def cancel(): Unit = {
    isRUNNING = false
  }
  override def open(parameters: Configuration): Unit = {
    super.open(parameters)
    conn = this.getConnection()
    val sql = "select * from soure1"
    ps = this.conn.prepareStatement(sql)
  }
  override def close(): Unit = {
      if(conn != null){
        conn.close()
      }
      if(ps != null){
        ps.close()
      }
  }
}

三.SinkFunction

保存数据到MySQL中

class RichSinkFunctionToMySQL extends RichSinkFunction[SourceBean]{
var isRUNNING: Boolean = true
var ps: PreparedStatement = null
var conn: Connection = null
// 建立连接
def getConnection():Connection = {
var conn: Connection = null
val url: String = "jdbc:mysql://192.168.200.115:3306/kd_test?useSSL=false"
val user: String = "root"
val password: String = "root"
try{
Class.forName("com.mysql.jdbc.Driver")
conn = DriverManager.getConnection(url, user, password)
} catch {
case _: Throwable => println("due to the connect error then exit!")
}
conn
}
/**
* open()初始化建立和 MySQL 的连接
* @param parameters
*/
override def open(parameters: Configuration): Unit = {
super.open(parameters)
conn = getConnection()
val sql: String = "insert into soure1(id, name , age,ctime) values(?, ?, ?, ?);"
ps = this.conn.prepareStatement(sql)
}
/**
* 组装数据,进行数据的插入操作
* 对每条数据的插入都要调用invoke()方法
*
* @param value
*/
override def invoke(value: SourceBean): Unit = {
ps.setString(1, value.id)
ps.setString(2, value.name)
ps.setInt(3, value.age)
ps.setDate(4, new Date(value.ctime.getTime))
ps.executeUpdate()
}
override def close(): Unit = {
if (conn != null) {
conn.close()
}
if(ps != null) {
ps.close()
}
}
}

四.组合

综合上述的一些函数,复合一个编程模型,输入->转换->输出

object DataStreamSinkToMysqlApp {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    import org.apache.flink.api.scala._
    val data = env.addSource(new MySQLRichParallelSourceFunction)
    val stream = data.map(m=>{
      val id = m.id +"1"
      SourceBean(id,m.name,m.age,m.ctime)
    })
    stream.addSink(new RichSinkFunctionToMySQL)
    env.execute("DataStreamSinkToMysqlApp")
  }
}


文章转载自李孟的博客,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论