暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

pyflink1.13版本的安装

原创 Clark Xu 2021-07-20
1854

1. 部署

下载docker下载pyflink

##下载docker docker pull python:3.8 docker images python:3.8 ##启动docker docker run -p 8000:8000 -i -d -v D:\it\workspace-code\pythonpath\src:/usr/python/src docker start 23bc20f392ca ##进入docker docker exec -it 23bc20f392ca /bin/sh winpty docker exec -it 23bc20f392ca bash ##安装vi apt-get update apt-get install vi ##配置Pip pip config set global.index-url https://pypi.tuna.tsinghua.edu.cn/simple vim /root/.config/pip/pip.conf [global] index-url=https://pypi.tuna.tsinghua.edu.cn/simple/ extra-index-url=https://pypi.mirrors.ustc.edu.cn/simple/ https://pypi.douban.com/simple/ https://pypi.org/simple timeout = 300 ##下载pyflink1.13 cd /usr/python/src/docker/pip pip download apache-flink==1.13

1.1 docker 部署 pyflink

docker pull python:3.8 docker images python:3.8 REPOSITORY TAG IMAGE ID CREATED SIZE python 3.8 ed68546ba062 5 days ago 883MB docker run -p 8080:8080 -i -d -v D:\it\workspace-code\pythonpath\src:/usr/python/src python:3.8 /bin/bash docker exec -it 23bc20f392ca /bin/sh python -c "import os;import os;print(os.path.dirname(os.path.abspath(os.__file__))+'/lib')" /usr/local/lib/python3.8/lib cd /usr/local/lib/python3.8 pip install apache-flink==1.13 #Successfully installed apache-beam-2.27.0 apache-flink-1.13.0 apache-flink-libraries-1.13.0 avro-python3-1.9.2.1 certifi-2020.12.5 chardet-4.0.0 cloudpickle-1.2.2 crcmod-1.7 dill-0.3.1.1 docopt-0.6.2 fastavro-0.23.6 future-0.18.2 grpcio-1.37.1 hdfs-2.6.0 httplib2-0.17.4 idna-2.10 mock-2.0.0 numpy-1.19.5 oauth2client-4.1.3 pandas-1.1.5 pbr-5.6.0 protobuf-3.17.0 py4j-0.10.8.1 pyarrow-2.0.0 pyasn1-0.4.8 pyasn1-modules-0.2.8 pydot-1.4.2 pymongo-3.11.4 pyparsing-2.4.7 python-dateutil-2.8.0 pytz-2021.1 requests-2.25.1 rsa-4.7.2 six-1.16.0 typing-extensions-3.7.4.3 urllib3-1.26.4 export JAVA_HOME='/usr/python/src/docker/jdk1.8.0_121' export PATH=$PATH:$JAVA_HOME/bin export CLASS_PATH=.:$JAVA_HOME/lib echo "export FLINK_HOME=/usr/local/lib/python3.8/site-packages/pyflink" >> .bashrc cd /usr/local/lib/python3.8/site-packages/pyflink bin/start-cluster.sh Starting cluster. Starting standalonesession daemon on host 23bc20f392ca. Starting taskexecutor daemon on host 23bc20f392ca.

1.1.1 下载jar包

cd /usr/local/lib/python3.8/site-packages/pyflink/lib curl -O https://repo1.maven.org/maven2/org/apache/flink/flink-jdbc_2.12/1.10.3/flink-jdbc_2.12-1.9.3.jar curl -O https://repo1.maven.org/maven2/org/apache/flink/flink-sql-connector-kafka_2.12/1.9.3/flink-sql-connector-kafka_2.12-1.9.3.jar

1.1.2 hivecatalog

  • hive配置和hadoop-common
scp -r hadoop_test@172.18.168.10:/etc/hive/conf ./etc/hive #org.apache.hadoop.conf.Configuration #hadoop-common依赖 STAX2 and WOODSTOX scp hadoop_test@172.18.168.10:/opt/cloudera/parcels/CDH-6.1.0-1.cdh6.1.0.p0.770702/jars/hadoop-common-3.0.0-cdh6.1.0.jar . #WOODSTOX: com.ctc.wstx.io.InputBootstrapper scp hadoop_test@172.18.168.10:/opt/cloudera/parcels/CDH/jars/woodstox-core*.jar . #STAX2: curl -O https://repo1.maven.org/maven2/org/codehaus/woodstox/stax2-api/4.2.1/stax2-api-4.2.1.jar #org.apache.commons.logging.LogFactory curl -O https://mirrors.bfsu.edu.cn/apache//commons/logging/binaries/commons-logging-1.2-bin.zip cp commons-logging-1.2.jar $FLINK_HOME/lib #org.apache.hadoop.mapred.JobConf curl -O https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-mapred/0.22.0/hadoop-mapred-0.22.0.jar #org.apache.commons.configuration2.Configuration curl -O https://repo1.maven.org/maven2/org/apache/commons/commons-configuration2/2.2/commons-configuration2-2.2.jar #org.apache.hadoop.util.PlatformName curl -O https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-auth/2.6.0/hadoop-auth-2.6.0.jar #org.apache.htrace.core.Tracer$Builder curl -O https://repo1.maven.org/maven2/org/apache/htrace/htrace-core4/4.2.0-incubating/htrace-core4-4.2.0-incubating.jar
  • conector包
cd /usr/local/lib/python3.8/site-packages/pyflink/lib curl -O https://repo1.maven.org/maven2/org/apache/flink/flink-sql-connector-hive-2.3.6_2.12/1.13.1/flink-sql-connector-hive-2.3.6_2.12-1.13.1.jar curl -O https://repo1.maven.org/maven2/org/apache/flink/flink-connector-hive_2.12/1.9.3/flink-connector-hive_2.12-1.9.3.jar curl -O https://repo1.maven.org/maven2/org/apache/flink/flink-hcatalog_2.12/1.9.3/flink-hcatalog_2.12-1.9.3.jar curl -O https://repo1.maven.org/maven2/org/apache/flink/flink-hcatalog/1.6.4/flink-hcatalog-1.6.4.jar
  • 调试
/usr/python/src/unicompayment.com/xuzl70/utils/flink_table

1.1.3 执行 SQL 语句

说明:

  • TableEnvironment 中提供了多种方式用于执行 SQL 语句,其用途略有不同:
方法名 使用说明
sql_query 用来执行 SELECT 语句
sql_update 用来执行 INSERT 语句 / CREATE TABLE 语句。该方法已经被 deprecate,推荐使用 execute_sql 或者create_statement_set 替代。
create_statement_set 用来执行多条 SQL 语句,可以通过该方法编写 multi-sink 的作业。
execute_sql 用来执行单条 SQL 语句。execute_sql VS create_statement_set: 前者只能执行单条 SQL 语句,后者可用于执行多条 SQL 语句 execute_sql VS sql_query:前者可用于执行各种类型的 SQL 语句,比如 DDL、 DML、DQL、SHOW、DESCRIBE、EXPLAIN、USE 等,后者只能执行 DQL 语句即使是 DQL 语句,两者的行为也不一样。前者会生成 Flink 作业,触发表数据的计算,返回 TableResult 类型,后者并不触发计算,仅对表进行逻辑变换,返回 Table 类型

1.1.4 作业

参数说明

通过 flink run 提交的时候,还有其它一些和 PyFlink 作业相关的参数。

参数名 用途描述 示例
-py / --python 指定作业的入口文件 -py file:///path/to/table_api_demo.py
-pym / --pyModule 指定作业的 entry module,功能和–python类似,可用于当作业的 Python 文件为 zip 包,无法通过–python 指定时,相比–python 来说,更通用 -pym table_api_demo -pyfs file:///path/to/table_api_demo.py
-pyfs / --pyFiles 指定一个到多个 Python 文件(.py/.zip等,逗号分割),这些 Python 文件在作业执行的时候,会放到 Python 进程的 PYTHONPATH 中,可以在 Python 自定义函数中访问到 -pyfs file:///path/to/table_api_demo.py,file:///path/to/deps.zip
-pyarch / --pyArchives 指定一个到多个存档文件(逗号分割),这些存档文件,在作业执行的时候,会被解压之后,放到 Python 进程的 workspace 目录,可以通过相对路径的方式进行访问 -pyarch file:///path/to/venv.zip
-pyexec / --pyExecutable 指定作业执行的时候,Python 进程的路径 -pyarch file:///path/to/venv.zip -pyexec venv.zip/venv/bin/python3
-pyreq / --pyRequirements 指定 requirements 文件,requirements 文件中定义了作业的依赖 -pyreq requirements.txt

local

说明:使用该方式执行作业时,会启动一个 minicluster,作业会提交到 minicluster 中执行,该方式适合作业开发阶段。

示例:python3 table_api_demo.py

standalone

说明:使用该方式执行作业时,作业会提交到一个远端的 standalone 集群。

./bin/flink run --jobmanager localhost:8081 --python table_api_demo.py cd /usr/python/src/unicompayment.com/xuzl70/utils $FLINK_HOME/./bin/flink run --jobmanager localhost:8000 --python ./flink_table/table_demo1.py $FLINK_HOME/./bin/flink run --jobmanager localhost:8000 --python ./flink_datastream/datastream_demo1.py

YARN Per-Job

说明:使用该方式执行作业时,作业会提交到一个远端的 YARN 集群。

示例:

./bin/flink run --target yarn-per-job --python table_api_demo.py

K8s application mode

说明:使用该方式执行作业时,作业会提交到 K8s 集群,以 application mode 的方式执行。

示例:

./bin/flink run-application \ --target kubernetes-application \ --parallelism 8 \ -Dkubernetes.cluster-id=<ClusterId> \ -Dtaskmanager.memory.process.size=4096m \ -Dkubernetes.taskmanager.cpu=2 \ -Dtaskmanager.numberOfTaskSlots=4 \ -Dkubernetes.container.image=<PyFlinkImageName> \

–pyModule table_api_demo \ --pyFiles file:///path/to/table_api_demo.py

2. datastream

2.1 kafka

  • 配置
cat /etc/hosts #127.0.0.1 localhost #172.20.0.3 kafka #172.20.0.2 zookeeper grep 'advertised.host.name' config/server.properties #listeners=PLAINTEXT://0.0.0.0:9092 #zookeeper.connect=zookeeper:2181 #advertised.host.name=kafka #port=9092 bin/kafka-topics.sh --bootstrap-server kafka:9092 --list

2.1.1 kafka-input

./bin/kafka-topics.sh --describe --bootstrap-server kafka:9092 --topic topic001 bin/kafka-console-producer.sh --topic topic001 --bootstrap-server kafka:9092 bin/kafka-console-consumer.sh --bootstrap-server kafka:9092 --topic topic001 --from-beginning

2.1.2 kafka output

bin/kafka-topics.sh --bootstrap-server kafka:9092 --create --topic output001 --partitions 1 --replication-factor 1 bin/kafka-topics.sh --zookeeper zookeeper:2181 --create --topic output001 --partitions 1 --replication-factor 1 ./bin/kafka-topics.sh --describe --bootstrap-server kafka:9092 --topic topic001 bin/kafka-console-producer.sh --topic output001 --bootstrap-server kafka:9092 bin/kafka-console-consumer.sh --bootstrap-server kafka:9092 --topic output001 --from-beginning
「喜欢这篇文章,您的关注和赞赏是给作者最好的鼓励」
关注作者
【版权声明】本文为墨天轮用户原创内容,转载时必须标注文章的来源(墨天轮),文章链接,文章作者等基本信息,否则作者和墨天轮有权追究责任。如果您发现墨天轮中有涉嫌抄袭或者侵权的内容,欢迎发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论