前言
本系列文章从0到1记录Flink Docker & k8s容器化实践总结,本文属于第四篇。本篇将记录基于K8S构建Flink最新版本2.1.0的集群的使用方式:Flink Native Kubernetes(云原生)模式之一的application集群模式,生产环境建议使用该模式,隔离性更强!强烈建议不要再使用standalone k8s。文章发布于微信公众号:大数据从业者,其它均为转载,原创不易,欢迎您点赞关注推荐转发!🙏

制作镜像
application模式可以选择将作业Jar打包到Flink镜像中,即镜像内置jarmoshi (非必须的,也可以使用外部放置Jar模式,比如:http jar或s3 jar,三种方式下文都会介绍,推荐后两种,更灵活),DockerFile如下:
vim myFlinkDockerFileFROM flink:2.1.0-scala_2.12RUN mkdir -p $FLINK_HOME/usrlibCOPY ./WordCount.jar $FLINK_HOME/usrlib/WordCount.jar
执行打镜像命令(tag为v2.1.0):
docker build -f myFlinkDockerFile -t felixzh.flink:v2.1.0 .

将本地镜像加载到miniku内:
minikube image load felixzh.flink:v2.1.0
都使用k8s集群了,那么s3必不可少啊,需要基于上述镜像增加插件flink-s3-fs-hadoop-2.1.0.jar,DockerFile如下:

执行打镜像命令(tag为v2.1.0-s3),加载到miniku内:
docker build -f myFlinkDockerFileS3 -t felixzh.flink:v2.1.0-s3 .minikube image load felixzh.flink:v2.1.0-s3

提交作业
export FLINK_ENV_JAVA_OPTS_CLI="--add-opens java.base/java.util=ALL-UNNAMED"
镜像内置jar模式,提交方式:
./flink run \--target kubernetes-application \-Dkubernetes.namespace=felixzh-ns \-Dkubernetes.jobmanager.service-account=felixzh-flink \-Dkubernetes.cluster-id=felixzh-application-cluster \-Dkubernetes.container.image=felixzh.flink:v2.1.0 \-Dkubernetes.rest-service.exposed.type=NodePort \-Denv.java.opts.jobmanager="--add-opens java.base/java.util=ALL-UNNAMED" \-Denv.java.opts.taskmanager="--add-opens java.base/java.util=ALL-UNNAMED" \local:///opt/flink/usrlib/WordCount.jar
注意:上述local为容器内路径,而非本地路径!
minikube kubectl -- get deployment -A

外置http jar模式(开启user.artifacts.raw-http-enabled),提交方式:
./flink run \--target kubernetes-application \-Dkubernetes.namespace=felixzh-ns \-Dkubernetes.jobmanager.service-account=felixzh-flink \-Dkubernetes.cluster-id=felixzh-application-cluster \-Dkubernetes.container.image=felixzh.flink:v2.1.0 \-Dkubernetes.rest-service.exposed.type=NodePort \-Denv.java.opts.jobmanager="--add-opens java.base/java.util=ALL-UNNAMED" \-Denv.java.opts.taskmanager="--add-opens java.base/java.util=ALL-UNNAMED" \-Duser.artifacts.raw-http-enabled=true \http://ip:port/WordCount.jar
注意:http路径jar需要可访问!

外置s3 jar模式,提交方式:
./flink run \--target kubernetes-application \-Ds3.endpoint=http://ip:port \-Ds3.access-key=minioadmin \-Ds3.secret-key=minioadmin \-Dkubernetes.namespace=felixzh-ns \-Dkubernetes.jobmanager.service-account=felixzh-flink \-Dkubernetes.cluster-id=felixzh-application-cluster \-Dkubernetes.container.image=felixzh.flink:v2.1.0-s3 \-Dkubernetes.rest-service.exposed.type=NodePort \-Denv.java.opts.jobmanager="--add-opens java.base/java.util=ALL-UNNAMED" \-Denv.java.opts.taskmanager="--add-opens java.base/java.util=ALL-UNNAMED" \s3://flink/WordCount.jar

注意:配置s3相关参数s3.endpoint、s3.access-key、s3.secret-key!
minikube kubectl -- get pods -A

上述s3这里使用开源MinIO,部署见下文!
MinIO部署
https://www.min.io/open-source/download?platform=linux&arch=amd64wget https://dl.min.io/server/minio/release/linux-amd64/miniochmod +x minio./minio –versionmkdir data./minio server --address 0.0.0.0:8082 data/

注意:可以 export 环境变量 MINIO_ROOT_USER 和 MINIO_ROOT_PASSWORD。
停止作业
查询cluster-id通过delete删除pod:
minikube kubectl -- get all -n felixzh-ns | grep felixzh-application-clusterminikube kubectl -- delete deployment/felixzh-application-cluster -n felixzh-ns

总结
本文记录最新版本Flink2.1云原生Native k8s模式的application集群实践总结,后续继续更新flink-kubernetes-operator(智能运维)模式的实践。文章发布于微信公众号:大数据从业者,其它均为转载,原创不易,欢迎您点赞关注推荐转发!🙏





