暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

Flink 实战 | 命令行实战看这篇就够了!

大数据技能圈 2023-08-30
163

Flink 命令行实战

bin/flink 是flink提供的CLI, 它链接了conf/flink-conf.yaml里面设置的运行的JobManager。基于此,我们可以来运行各种命令,比如启动任务,停止任务,保存状态点等。接下来挨个详细介绍。

1

作业生命周期管理

Job Lifecycle Management


提交作业

Submitting a Job

通过将作业的JAR和相关依赖项上传到Flink集群,来启动作业执行。我们可以选择examples/streaming/StateMachineExample.jar。

    $ ./bin/flink run \
    --detached \
    ./examples/streaming/StateMachineExample.jar

    --detached 参数 可以使该命令在提交完成后返回。


    工作监控

    Job Monitoring


    我们可以使用list命令来监视任何正在运行的作业:

      $ ./bin/flink list
        Waiting for response...
        ------------------ Running/Restarting Jobs -------------------
        30.11.2020 16:02:29 : cca7bc1061d61cf15238e92312c2fc20 : State machine job (RUNNING)
        --------------------------------------------------------------
        No scheduled jobs.
        已提交但尚未启动的作业将列在“Scheduled Jobs”下面。

        创建保存点

        Creating a Savepoint

        可以创建保存点来保存作业的当前状态。所需要的只是JobID:

          $ ./bin/flink savepoint \
          $JOB_ID \
          tmp/flink-savepoints
            Triggering savepoint for job cca7bc1061d61cf15238e92312c2fc20.
            Waiting for response...
            Savepoint completed. Path: file:/tmp/flink-savepoints/savepoint-cca7bc-bb1e257f0dab
            You can resume your program from this savepoint with the run command.

            保存点文件夹是可选的,如果没有设置state.savepoints.dir,则需要指定它。

            保存点的路径稍后可用于重新启动Flink作业。

            使用检查点

            Disposing a Savepoint


            使用相应的保存点路径需要添加: --dispose

              $ ./bin/flink savepoint \ 
              --dispose \
              tmp/flink-savepoints/savepoint-cca7bc-bb1e257f0dab \
              $JOB_ID
                Disposing savepoint '/tmp/flink-savepoints/savepoint-cca7bc-bb1e257f0dab'.
                Waiting for response...
                Savepoint '/tmp/flink-savepoints/savepoint-cca7bc-bb1e257f0dab' disposed.

                如果使用自定义状态实例(例如自定义reducing state 或RocksDBstate),必须指定触发保存点的程序JAR的路径。否则会遇到ClassNotFoundException错误:

                  $ ./bin/flink savepoint \
                  --dispose <savepointPath> \
                  --jarfile <jarFile>

                  保存点一旦被使用后就会从存储中删除保存点数据,同时会使Flink清除与保存点相关的元数据。

                  停止作业

                  Terminating a Job


                  优雅地停止作业并创建最终保存点

                  可以通过stop 命令来停止任务,这个命令可以使数据停止流动,并且能够使所有的source发出最后的checkpoint barrier,这些barrier将触发savepoint。

                    $ ./bin/flink stop \
                    --savepointPath tmp/flink-savepoints \
                    $JOB_ID
                      Suspending job "cca7bc1061d61cf15238e92312c2fc20" with a savepoint.
                      Savepoint completed. Path: file:/tmp/flink-savepoints/savepoint-cca7bc-bb1e257f0dab

                      如果没有设置state.savepoints.dir,我们必须使用 --savepointPath 来指定保存点文件夹。

                      取消作业

                      可以通过 cancel 命令 来取消任务

                        $ ./bin/flink cancel $JOB_ID
                          Cancelling job cca7bc1061d61cf15238e92312c2fc20.
                          Cancelled job cca7bc1061d61cf15238e92312c2fc20.

                          相应的作业状态将从“运行”转换为“取消”。任何计算都将停止。


                          重启作业

                          Terminating a Job


                          从savepoint重新启动任务

                            $ ./bin/flink run \
                            --detached \
                            --fromSavepoint /tmp/flink-savepoints/savepoint-cca7bc-bb1e257f0dab \
                            ./examples/streaming/StateMachineExample.jar
                              Usage with built-in data generator: StateMachineExample [--error-rate <probability-of-invalid-transition>] [--sleep <sleep-per-record-in-ms>]
                              Usage with Kafka: StateMachineExample --kafka-topic <topic> [--brokers <brokers>]
                              Options for both the above setups:
                              [--backend <file|rocks>]
                              [--checkpoint-dir <filepath>]
                              [--async-checkpoints <true|false>]
                              [--incremental-checkpoints <true|false>]
                              [--output <filepath> OR null for stdout]


                              Using standalone source with error rate 0.000000 and sleep delay 1 millis


                              Job has been submitted with JobID 97b20a0a8ffd5c1d656328b0cd6436a6

                              可以看出来,这个命令跟正常的提交命令相比,多了一个 --fromSavepoint 参数。该参数用于引用先前停止的作业的状态。生成一个新的JobID,可用于维护作业。

                              默认情况下,我们尝试将整个保存点状态与提交的作业相匹配。如果希望允许跳过不能用新作业恢复的保存点状态,可以使用  --allowNonRestoredState 参数。如果我们修改了程序中的某个逻辑,该逻辑之前被保存在了savepoint,那么这个时候使用该参数就可以跳过被修改部分代码的状态恢复,其他部分的状态能够正常恢复使用。

                                $ ./bin/flink run \
                                --fromSavepoint <savepointPath> \
                                --allowNonRestoredState ...

                                2

                                提交python代码

                                Submitting PyFlink Jobs


                                目前,可以通过CLI提交PyFlink作业。它不需要指定JAR文件路径或条目主类,这与Java作业提交不同。

                                当通过flink run提交Python作业时,flink将运行命令" Python "。请运行以下命令以确认当前环境中的python可执行文件指向受支持的python 3.7+版本。

                                  $ python --version
                                  # the version printed here must be 3.7+

                                  以下命令显示了不同的PyFlink作业提交用例:

                                  • 运行 PyFlink job:

                                    $ ./bin/flink run --python examples/python/table/word_count.py
                                    • 运行带有附加源文件和资源文件的PyFlink作业。在 --pyFiles 中指定的文件将被添加到PYTHONPATH中。

                                      $ ./bin/flink run \
                                      --python examples/python/table/word_count.py \
                                      --pyFiles file:///user.txt,hdfs:///$namenode_address/username.txt
                                      • 运行一个需要引用Java UDF或外部连接器的PyFlink作业。在 --jarfile 中指定的JAR文件将被上传到集群。

                                        $ ./bin/flink run \
                                        --python examples/python/table/word_count.py \
                                        --jarfile <jarFile>
                                        • 使用 --pyModule 指定主入口模块运行PyFlink作业:

                                          $ ./bin/flink run \
                                          --pyModule word_count \
                                                --pyFiles examples/python/table
                                          • 指定在主机<jobmanagerHost>上运行的特定JobManager上提交PyFlink作业

                                            $ ./bin/flink run \
                                            --jobmanager <jobmanagerHost>:8081 \
                                            --python examples/python/table/word_count.py
                                            • 运行 PyFlink job 使用 YARN 集群  Per-Job 模式:

                                              $ ./bin/flink run \
                                              --target yarn-per-job
                                              --python examples/python/table/word_count.py
                                              • 运行 PyFlink job 使用 YARN 集 Application 模式:

                                                $ ./bin/flink run-application -t yarn-application \
                                                -Djobmanager.memory.process.size=1024m \
                                                -Dtaskmanager.memory.process.size=1024m \
                                                -Dyarn.application.name=<ApplicationName> \
                                                -Dyarn.ship-files=/path/to/shipfiles \
                                                -pyarch shipfiles/venv.zip \
                                                -pyclientexec venv.zip/venv/bin/python3 \
                                                -pyexec venv.zip/venv/bin/python3 \
                                                -pyfs shipfiles \
                                                -pym word_count




                                                动动小手 关注我们




                                                文章转载自大数据技能圈,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

                                                评论