背景
Flink1.16之前版本,SQL Client(Flink内置组件)仅支持本地提交SQL作业的功能。为了满足实际场景需要远程提交SQL的需求,出现很多良莠不齐的落地方案。诸如:Apache Zeppelin、Apache Kyuubi、flink-sql-gateway(ververica)、Apache Streampark等等。个人认为这些方案都存在一个致命痛点:无法及时紧跟Flink版本迭代发展、支持的功能点参差不齐(如:不支持selsect等)。借鉴Kyuubi使用Flink SQL的构架模型,基于FLIP-91: Support SQL Gateway Flink社区1.16版本开始内置自己的SQL Gateway组件,虽然当前版本功能点也不完善(如:不支持kerberos),相信社区能够快速补齐,解决上述痛点问题。
架构
SQL Gateway提供一种多客户端、远程并发执行SQL的能力,功能模块包括:SqlGateWayService(用于处理所有Endpoints请求)、Endpoints(可插拔,用于提供用户访问的入口)。目前版本中Endpoints类型分为:REST Endpoint、HiveServer2 Endpoint。
REST Endpoint:提供restful访问方式,适用于原生Flink用户使用SQL的场景。HiveServer2 Endpoint:提供jdbc访问方式,与HiveServer2语法兼容达到94%,适用于Hive用户需要使用Flink SQL的场景。

核心概念
Session
Session表示访问SqlGateway的用户,SessionHandle表示用户ID、与Session一一对应;用户每申请创建一个Session、SqlGateway会返回一个唯一标识的SessionHandle。除了标识用户信息,也作为隔离对象,比如配置隔离、权限隔离、catalog元数据隔离等。
Operation
每个用户请求的内容(如SQL语句)都会被SqlGateway转换为一个Operation,SqlGateway会返回一个唯一标识的OperationHandle。
SessionManager
SessionManager负责管理Session,比如创建、配置、查看配置或状态、关闭等。其中,关闭动作可以是用户申请操作的、也可以是SqlGateway基于过期机制周期检测关闭的。
OperationManager
OperationManager负责管理Operation,比如初始化、执行、清理等。当Session关闭时,SqlGateway会自动清理该Session对应的所有Operation。
上述概念组织结构如图所示:

案例实践
SqlGateway支持提交两种运行模式的作业到Yarn集群,参数设置如下:
-Dexecution.target=yarn-per-job-Dexecution.target=yarn-session –Dyarn.application.id= application_1677827030423_0018
其中:application_1677827030423_0018为已经运行的yarn-session集群applicationID。
REST Endpoint模式
以yarn-per-job运行模式为例:
./sql-gateway.sh start -Dsql-gateway.endpoint.rest.address=felixzh -Dexecution.target=yarn-per-job

1. 创建session:
http://felixzh:8083/v1/sessions

2. 创建输入表:
http://felixzh:8083/v1/sessions/3d2a6c8f-12c7-4fe3-af44-2da8cde8f537/statements

3. 创建输出表:
http://felixzh:8083/v1/sessions/3d2a6c8f-12c7-4fe3-af44-2da8cde8f537/statements

4. 提交SQL作业:
http://felixzh:8083/v1/sessions/3d2a6c8f-12c7-4fe3-af44-2da8cde8f537/statements

5. 查看作业运行情况

说明:目前仅支持单请求单SQL的情况,如下:

具体支持哪些rest接口,可以参照源码接口文档遍历验证,路径为:
https://github.com/apache/flink/blob/master/flink-table/flink-sql-gateway/src/test/resources

HiveServer2 Endpoint模式
准备flink-connector-hive_2.12-1.16.1.jar和hive-exec.jar到flink/lib目录。
以yarn-per-job运行模式为例:
./sql-gateway.sh start -Dsql-gateway.endpoint.type=hiveserver2 -Dsql-gateway.endpoint.hiveserver2.thrift.port=1005 -Dsql-gateway.endpoint.hiveserver2.catalog.hive-conf-dir=/etc/hive/conf -Dexecution.target=yarn-per-job

适配Kerberos改造
出于安全性考虑,实际生产环境Hadoop集群通常会开启Kerberos认证。SqlGateway将作业提交到Yarn集群时,就需要支持Kerberos认证。目前社区版本中还未进行适配支持,我已经提交问题单FLINK-31335进行跟踪:

也已经提交了相关PR进行适配改造:

改造后的效果就是:默认使用flink-conf.yaml中security.kerberos.login.keytab和security.kerberos.login.principal,也支持-Dsecurity.kerberos.login.keytab、-Dsecurity.kerberos.login.principal进行覆盖设置。有需要或者感兴趣的朋友,可以自行查看。




