暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

教育行业PyFlink整合FlinkML机器学习场景实践总结

大数据从业者 2024-10-28
216

前言   

本文主要记录教育行业高校PyFlink整合Flink ML的场景案例实践总结。PyFlink是可以使用Python语言开发Apache Flink的功能API,允许构建批或流任务、机器学习、ETL等场景,分为Table API和DataStreamAPI。

FlinkML类库提供机器学习API、简化构建机器学习流式管道的复杂度,支持Java、Python语言,提供分类、聚类、回归、推荐、特征工程等多种场景的默认实现。欢迎关注微信公众号:大数据从业者

  

Flink ML模块   

源码编译

    git clone -b release-2.3.0 https://github.com/apache/flink-ml.git
    cd flink-ml && mvn clean package -DskipTests -Pflink-1.17

    拷贝Jar到Flink部署目录

      cp flink-ml-dist/target/flink-ml-2.3.0-bin/flink-ml-2.3.0/lib/flink-ml-uber-1.17-2.3.0.jar /home/myHadoopCluster/flink-1.17.1/lib/
      cp flink-ml-dist/target/flink-ml-2.3.0-bin/flink-ml-2.3.0/lib/statefun-flink-core-3.2.0.jar /home/myHadoopCluster/flink-1.17.1/lib/
      cp flink-ml-dist/target/flink-ml-2.3.0-bin/flink-ml-2.3.0/lib/flink-ml-examples-1.17-2.3.0.jar home/myHadoopCluster/flink-1.17.1/examples/

      验证Java语言测试用例

        ./flink run -t yarn-per-job -c org.apache.flink.ml.examples.clustering.KMeansExample ../examples/flink-ml-examples-1.17-2.3.0.jar

        Python虚拟环境  

        Flink ML使用Python语言开发,需要特定版本:3.7或者3.8。如果你的集群环境Python版本不满足,建议使用如下脚本工具setup-pyflink-virtual-env.sh构建Python虚拟环境:

          set -e
          # download miniconda.sh
          sys_os=$(uname -s)
          echo "Detected OS: ${sys_os}"
          sys_machine=$(uname -m)
          echo "Detected machine: ${sys_machine}"


          if [[ ${sys_os} == "Darwin" ]]; then
          wget "https://repo.anaconda.com/miniconda/Miniconda3-py310_23.5.2-0-MacOSX-${sys_machine}.sh" -O "miniconda.sh"
          elif [[ ${sys_os} == "Linux" ]]; then
          wget "https://repo.anaconda.com/miniconda/Miniconda3-py310_23.5.2-0-Linux-${sys_machine}.sh" -O "miniconda.sh"
          else
          echo "Unsupported OS: ${sys_os}"
          exit 1
          fi


          # add the execution permission
          chmod +x miniconda.sh


          # create python virtual environment
          ./miniconda.sh -b -p venv


          # activate the conda python virtual environment
          source venv/bin/activate ""


          # install PyFlink dependency
          if [[ $1 = "" ]]; then
          # install the latest version of pyflink
          pip install apache-flink
          else
          # install the specified version of pyflink
          pip install "apache-flink==$1"
          fi


          # deactivate the conda python virtual environment
          conda deactivate


          # remove the cached packages
          rm -rf venv/pkgs


          # package the prepared conda python virtual environment
          zip -r venv.zip venv
            sh setup-pyflink-virtual-env.sh 1.17.1

            脚本流程主要是先下载miniconda.sh构建Python虚拟环境venv、然后通过pip安装pyflink的依赖apache-flink、最后打成压缩包venv.zip。最终结果如下:

            venv虚拟环境验证

              source venv/bin/activate  # 声明环境变量
              python --version     # 查看Python版本号
              conda deactivate # 撤销环境变量

              pyflink依赖验证(local模式)

                source venv/bin/activate
                python /home/myHadoopCluster/flink-1.17.1/examples/python/datastream/word_count.py

                PyFlink on Yarn实践  

                通常真实现场环境都是Pyflink提交作业到yarn集群,使用统一的资源管理。针对Python虚拟环境的使用,分为三种方法:

                方法1:每个pyflink作业提交时自行上传venv.zip

                将示例代码和venv.zip放置到特定目录,如:/tmp/myApp

                  ./flink run-application  -t yarn-application   -Dyarn.ship-files=/tmp/myApp/ -pyarch myApp/venv.zip -pyclientexec venv.zip/venv/bin/python3 -pyexec venv.zip/venv/bin/python3 -pyfs myApp -pym word_count --output hdfs:///tmp/       

                  方法2:提前上传venv.zip到hdfs供所有pyflink作业使用

                    hdfs dfs -put ven.zip hdfs:///flink/

                      ./flink run-application  -t yarn-application -Dyarn.ship-files=/home/myHadoopCluster/flink-1.18.1/examples/python/datastream/  -pyarch hdfs:///flink/venv.zip  -pyclientexec venv.zip/venv/bin/python3 -pyexec venv.zip/venv/bin/python3 -pyfs datastream -pym word_count --output hdfs:///tmp    

                      方法3:提前在每个yarn集群节点本地放置相同路径venv.zip解压文件夹

                        ./flink run-application  -t yarn-application -pyclientexec /tmp/venv/bin/python3 -pyexec /tmp/venv/bin/python3 -Dyarn.ship-files=/home/myHadoopCluster/flink-1.18.1/examples/python/datastream/word_count.py -py word_count.py   --output hdfs:///tmp/

                        推荐使用方法3,性能最好。但是,需要注意一点:yarn集群节点扩容时,新节点需要部署相同venv目录即可!    

                        总结

                        本文记录如何使用conda构建Python虚拟环境、如何使用PyFlink整合使用FlinkML类库。

                        文章转载自大数据从业者,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

                        评论