暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

17. PyFlink 集成 hive catalog模式读写数据

大数据技能圈 2022-12-10
53

本案例 Flink版本 1.15.2 ;hive 版本 2.3.3 hadoop 版本 2.8.2;jdk版本 1.8;python版本3.8。
Flink安装包及相关jar包下载链接:https://pan.baidu.com/s/1L5dOxdrcEdFGIef73-sleQ  提取码:brhl

一、准备工作
1. 保姆级安装CentOS教程文档
2. SSH客户端神器之 MobaXterm
3. CentOS免密设置
4. CentOS安装 JDK
5. CentOS安装MySQL
6. Hadoop 安装(集群版)
7. CentOS 安装 Hive(集群版)
8. Anaconda介绍、安装及使用
9. Jupyter Notebook中配置多版本Python
二、安装步骤
conda安装虚拟环境,--name后面的名称可以随意起,我这里叫做pyflink
    conda create --name pyflink python=3.8

    输入 y

    激活pyflink环境

      conda activate pyflink

      虚拟环境安装pyflink

        python -m pip install apache-flink==1.15.2

        复制flink环境中的jar包到pyflink中

          cp opt/software/flink-1.15.2/lib/* /opt/software/miniconda3/envs/pyflink/lib/python3.8/site-packages/pyflink/lib

          编写python代码 catalog.py

            from pyflink.table import *
            from pyflink.table.catalog import HiveCatalog, CatalogDatabase, ObjectPath, CatalogBaseTable
            catalog = HiveCatalog("myhive","caijing01", "/opt/software/hive/conf")
            t_env = TableEnvironment.create(environment_settings=EnvironmentSettings.in_batch_mode())
            t_env.register_catalog("myhive", catalog)
            t_env.execute_sql("select * from myhive.caijing01.score5").print()
            t_env.execute_sql("create table myhive.caijing01.score8 like myhive.caijing01.score5")
            t_env.execute_sql("insert into myhive.caijing01.score8 select * from myhive.caijing01.score5")

            或者

              from pyflink.table import *
              from pyflink.table.catalog import HiveCatalog
              settings = EnvironmentSettings.in_batch_mode()
              t_env = TableEnvironment.create(settings)
              source_ddl = """CREATE CATALOG myhive WITH ( 'type' = 'hive','default-database'='caijing01','hive-conf-dir' = '/opt/software/hive/conf')"""
              t_env.execute_sql(source_ddl)
              sql1= """select * from myhive.caijing01.score5"""
              t_env.execute_sql(sql1).print()

              执行py代码

                 opt/software/flink-1.15.2/bin/flink run -pyexec opt/software/miniconda3/envs/pyflink/bin/python -py opt/software/flink-1.15.2/code/catalog.py

                查看flink网页查看运行情况

                至此,Pyflink 集成 hive catalog模式读写数据完成。

                更多实战详情请关注字节智传公众号

                往期精彩

                12. Anaconda介绍、安装及使用

                13. Jupyter Notebook介绍、安装及使用

                14. Jupyter Notebook中配置多版本Python

                15. Anaconda 搭建pyflink开发环境

                16. FlinkSql 集成 hive catalog模式进行读写数据

                文章转载自大数据技能圈,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

                评论