本文结合官网和网络资料,讲解 Flink 用于访问外部数据存储的异步 I/O API。对于不熟悉异步或者事件驱动编程的用户,建议先储备一些关于 Future 和事件驱动编程的知识。
Flink异步IO官方文档地址:
https://ci.apache.org/projects/flink/flink-docs-release-1.13/zh/docs/dev/datastream/operators/asyncio/
我程序大体流程是,从socket流接收用户数据user(id,userNum,age三个字段),利用记录数据中的userNum字段,查询Mysql数据库用户姓名字典表userCode中的userName用户名,将查询出的userName字段,回填到user中,最后将回填的数据存到Mysql数据库的userInfo表中。
1.建表语句:
userCode(字典表)和userInfo表(目标表)对应DDL语句:
CREATE TABLE `userInfo` (`id` int(11) NOT NULL,`userNum` int(11) DEFAULT NULL,`userName` varchar(64) COLLATE utf8mb4_unicode_ci DEFAULT NULL,`age` int(11) DEFAULT NULL,PRIMARY KEY (`id`)) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci
CREATE TABLE `userCode` (`id` int(11) NOT NULL,`name` varchar(50) COLLATE utf8mb4_unicode_ci DEFAULT NULL,PRIMARY KEY (`id`)) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci
2.pom.xml文件,这里引入了阿里巴巴的druid
<dependencies><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java_${scala.binary.version}</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients_${scala.binary.version}</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka_${scala.binary.version}</artifactId><version>${flink.version}</version></dependency><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>5.1.44</version></dependency><dependency><groupId>com.alibaba</groupId><artifactId>druid</artifactId><version>1.1.20</version></dependency></dependencies>
3.代码整体结构如下:
主函数:FlinkMysqlAsyncIOMain
异步IO处理类,主要用于查询mysql:MysqlAsyncRichMapFunction
Sink端,将结果输出到mysql中:MysqlSinkFunction
数据库工具类:DBUtils
用户实体类:User
package com.hadoop.ljs.flink112.study.asyncIO;import org.apache.flink.streaming.api.datastream.AsyncDataStream;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import java.util.concurrent.TimeUnit;/*** @author: Created By lujisen* @company China JiNan* @date: 2021-08-18 14:50* @version: v1.0* @description: com.hadoop.ljs.flink112.study.asyncIO*/public class FlinkMysqlAsyncIOMain {public static void main(String[] args) throws Exception {int maxIORequestCount = 20; /*最大的异步请求数量*/StreamExecutionEnvironment senv = StreamExecutionEnvironment.getExecutionEnvironment();DataStream<String> socketDataStream = senv.socketTextStream("192.168.0.111", 9999);DataStream<User> userDataStream = AsyncDataStream.orderedWait(socketDataStream,new MysqlAsyncRichMapFunction(), //自定义的Mysql异步处理类500000, //异步超时时间TimeUnit.MILLISECONDS, //时间单位maxIORequestCount //最大异步并发请求数量);userDataStream.print();userDataStream.addSink(new MysqlSinkFunction<User>());/* userDataStream.print();*/senv.execute("FlinkMysqlAsyncIOMain");}}
package com.hadoop.ljs.flink112.study.asyncIO;import com.alibaba.druid.pool.DruidDataSource;import java.sql.Connection;import java.sql.PreparedStatement;import org.apache.flink.api.java.tuple.Tuple2;import org.apache.flink.configuration.Configuration;import org.apache.flink.streaming.api.functions.async.ResultFuture;import org.apache.flink.streaming.api.functions.async.RichAsyncFunction;import java.sql.ResultSet;import java.sql.SQLException;import java.util.Collections;import java.util.concurrent.*;import java.util.function.Supplier;/*** @author: Created By lujisen* @company China JiNan* @date: 2021-08-18 10:25* @version: v1.0* @description: com.hadoop.ljs.flink112.study.asyncIO*/public class MysqlAsyncRichMapFunction extends RichAsyncFunction<String,User> {/** 能够利用回调函数并发发送请求的数据库客户端,加上transient,不让其序列化 *//** 创建线程池、Mysql连接池 */@Overridepublic void open(Configuration parameters) throws Exception {super.open(parameters);}@Overridepublic void asyncInvoke(String line, ResultFuture<User> resultFuture) throws Exception {String[] split = line.split(",");User user = new User();user.setId(Integer.valueOf(split[0]));user.setUserNum(Integer.valueOf(split[1]));user.setAge(Integer.valueOf(split[2]));Future<String> dbResult = DBUtils.executorService.submit(new Callable<String>() {@Overridepublic String call() throws Exception {ResultSet resultSet=null;PreparedStatement statement=null;String sql = "SELECT id, name FROM userCode WHERE id = ?";String userName=null;if(user.getUserNum()==1001){System.out.println("当前getUserNum:"+user.getUserNum()+"开始睡眠30秒!!!");Thread.sleep(30000);}try {statement = DBUtils.getConnection().prepareStatement(sql);statement.setInt(1,user.getUserNum());resultSet = statement.executeQuery();while (resultSet.next()) {userName= resultSet.getString("name");}} finally {if (resultSet != null) {resultSet.close();}if (statement != null) {statement.close();}}return userName;}});CompletableFuture.supplyAsync(new Supplier<String>() {@Overridepublic String get() {try {return dbResult.get();} catch (InterruptedException | ExecutionException e) {// 显示地处理异常。return null;}}}).thenAccept( (String userName) -> {user.setUserName(userName);resultFuture.complete(Collections.singleton(user));});}@Overridepublic void close() throws Exception {super.close();}}
package com.hadoop.ljs.flink112.study.asyncIO;import org.apache.flink.configuration.Configuration;import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;import java.sql.PreparedStatement;/*** @author: Created By lujisen* @company China JiNan* @date: 2021-08-18 15:02* @version: v1.0* @description: com.hadoop.ljs.flink112.study.asyncIO*/public class MysqlSinkFunction<U> extends RichSinkFunction<User> {private static final String UPSERT_CASE = "INSERT INTO userInfo(id,userNum,userName,age) VALUES (?, ?,?,?)";private PreparedStatement statement=null;@Overridepublic void open(Configuration parameters) throws Exception {super.open(parameters);statement = DBUtils.getConnection().prepareStatement(UPSERT_CASE);}@Overridepublic void invoke(User user, Context context) throws Exception {statement.setInt(1,user.getId());statement.setInt(2, user.userNum);statement.setString(3, user.getUserName());statement.setInt(4, user.getAge());statement.addBatch();statement.executeBatch();}@Overridepublic void close() throws Exception {super.close();if(statement!=null){statement.close();}}}
package com.hadoop.ljs.flink112.study.asyncIO;import com.alibaba.druid.pool.DruidDataSource;import java.sql.Connection;import java.sql.SQLException;import java.util.concurrent.ExecutorService;import static java.util.concurrent.Executors.newFixedThreadPool;/*** @author: Created By lujisen* @company China JiNan* @date: 2021-08-18 15:19* @version: v1.0* @description: com.hadoop.ljs.flink112.study.asyncIO*/public class DBUtils {private static String jdbcUrl = "jdbc:mysql://192.168.0.111:3306/lujisen?characterEncoding=utf8";private static String username = "root";private static String password = "123456a?";private static String driverName = "com.mysql.jdbc.Driver";public static Connection connection=null;public static transient ExecutorService executorService = null;public static transient DruidDataSource dataSource = null;/*连接池最大线程数*/private static int maxPoolConn=20;/*静态初始化*/static {//创建线程池executorService = newFixedThreadPool(maxPoolConn);dataSource=new DruidDataSource();dataSource.setDriverClassName(driverName);dataSource.setUsername(username);dataSource.setPassword(password);dataSource.setUrl(jdbcUrl);dataSource.setMaxActive(maxPoolConn);}public static Connection getConnection() throws SQLException {if(connection==null){connection= dataSource.getConnection();}return connection;}}
package com.hadoop.ljs.flink112.study.asyncIO;/** * @author: Created By lujisen * @company China JiNan * @date: 2021-08-18 10:13 * @version: v1.0 * @description: com.hadoop.ljs.flink112.study.asyncIO */public class User { int id; int userNum; String userName; int age; public User() { } public User(int id, int userNum, String userName, int age) { this.id = id; this.userNum = userNum; this.userName = userName; this.age = age; } public int getId() { return id; } public void setId(int id) { this.id = id; } public int getUserNum() { return userNum; } public void setUserNum(int userNum) { this.userNum = userNum; } public String getUserName() { return userName; } public void setUserName(String userName) { this.userName = userName; } public int getAge() { return age; } public void setAge(int age) { this.age = age; } @Override public String toString() { return "User{" + "id=" + id + ", userNum=" + userNum + ", userName='" + userName + '\'' + ", age=" + age + '}'; }}
说明:这里可忽略上面的多表数据,只发送后面的2条数据即可,Main函数中设置Flink异步IO超时时间是50s,而MysqlAsyncRichMapFunction文件asyncInvoke函数处理逻辑,当数据id为1时,进程休眠30秒,第一条数据阻塞执行,收到第二条数据不阻塞,直接处理第二条写入mysql数据库,30秒后,程序继续处理第一条数据,可观察Mysql数据表userInfo,看到第二条数据先入库,第一条数据30秒之后入库。
1,1001,23 第一条数据2,1002,24 第二条数据

如果觉得我的文章能帮到您,请关注微信公众号“大数据开发运维架构”,并转发朋友圈,谢谢支持!!!





