✦
Flink 命令行实战
✦
bin/flink 是flink提供的CLI, 它链接了conf/flink-conf.yaml里面设置的运行的JobManager。基于此,我们可以来运行各种命令,比如启动任务,停止任务,保存状态点等。接下来挨个详细介绍。
Job Lifecycle Management
Submitting a Job
通过将作业的JAR和相关依赖项上传到Flink集群,来启动作业执行。我们可以选择examples/streaming/StateMachineExample.jar。
$ ./bin/flink run \--detached \./examples/streaming/StateMachineExample.jar
--detached 参数 可以使该命令在提交完成后返回。
Job Monitoring
我们可以使用list命令来监视任何正在运行的作业:
$ ./bin/flink list
Waiting for response...------------------ Running/Restarting Jobs -------------------30.11.2020 16:02:29 : cca7bc1061d61cf15238e92312c2fc20 : State machine job (RUNNING)--------------------------------------------------------------No scheduled jobs.
已提交但尚未启动的作业将列在“Scheduled Jobs”下面。
Creating a Savepoint
可以创建保存点来保存作业的当前状态。所需要的只是JobID:
$ ./bin/flink savepoint \$JOB_ID \tmp/flink-savepoints
Triggering savepoint for job cca7bc1061d61cf15238e92312c2fc20.Waiting for response...Savepoint completed. Path: file:/tmp/flink-savepoints/savepoint-cca7bc-bb1e257f0dabYou can resume your program from this savepoint with the run command.
保存点文件夹是可选的,如果没有设置state.savepoints.dir,则需要指定它。
保存点的路径稍后可用于重新启动Flink作业。
Disposing a Savepoint
使用相应的保存点路径需要添加: --dispose
$ ./bin/flink savepoint \--dispose \tmp/flink-savepoints/savepoint-cca7bc-bb1e257f0dab \$JOB_ID
Disposing savepoint '/tmp/flink-savepoints/savepoint-cca7bc-bb1e257f0dab'.Waiting for response...Savepoint '/tmp/flink-savepoints/savepoint-cca7bc-bb1e257f0dab' disposed.
如果使用自定义状态实例(例如自定义reducing state 或RocksDBstate),必须指定触发保存点的程序JAR的路径。否则会遇到ClassNotFoundException错误:
$ ./bin/flink savepoint \--dispose <savepointPath> \--jarfile <jarFile>
保存点一旦被使用后就会从存储中删除保存点数据,同时会使Flink清除与保存点相关的元数据。
Terminating a Job
优雅地停止作业并创建最终保存点
可以通过stop 命令来停止任务,这个命令可以使数据停止流动,并且能够使所有的source发出最后的checkpoint barrier,这些barrier将触发savepoint。
$ ./bin/flink stop \--savepointPath tmp/flink-savepoints \$JOB_ID
Suspending job "cca7bc1061d61cf15238e92312c2fc20" with a savepoint.Savepoint completed. Path: file:/tmp/flink-savepoints/savepoint-cca7bc-bb1e257f0dab
如果没有设置state.savepoints.dir,我们必须使用 --savepointPath 来指定保存点文件夹。
取消作业
可以通过 cancel 命令 来取消任务
$ ./bin/flink cancel $JOB_ID
Cancelling job cca7bc1061d61cf15238e92312c2fc20.Cancelled job cca7bc1061d61cf15238e92312c2fc20.
相应的作业状态将从“运行”转换为“取消”。任何计算都将停止。
Terminating a Job
从savepoint重新启动任务
$ ./bin/flink run \--detached \--fromSavepoint /tmp/flink-savepoints/savepoint-cca7bc-bb1e257f0dab \./examples/streaming/StateMachineExample.jar
Usage with built-in data generator: StateMachineExample [--error-rate <probability-of-invalid-transition>] [--sleep <sleep-per-record-in-ms>]Usage with Kafka: StateMachineExample --kafka-topic <topic> [--brokers <brokers>]Options for both the above setups:[--backend <file|rocks>][--checkpoint-dir <filepath>][--async-checkpoints <true|false>][--incremental-checkpoints <true|false>][--output <filepath> OR null for stdout]Using standalone source with error rate 0.000000 and sleep delay 1 millisJob has been submitted with JobID 97b20a0a8ffd5c1d656328b0cd6436a6
可以看出来,这个命令跟正常的提交命令相比,多了一个 --fromSavepoint 参数。该参数用于引用先前停止的作业的状态。生成一个新的JobID,可用于维护作业。
默认情况下,我们尝试将整个保存点状态与提交的作业相匹配。如果希望允许跳过不能用新作业恢复的保存点状态,可以使用 --allowNonRestoredState 参数。如果我们修改了程序中的某个逻辑,该逻辑之前被保存在了savepoint,那么这个时候使用该参数就可以跳过被修改部分代码的状态恢复,其他部分的状态能够正常恢复使用。
$ ./bin/flink run \--fromSavepoint <savepointPath> \--allowNonRestoredState ...
Submitting PyFlink Jobs
目前,可以通过CLI提交PyFlink作业。它不需要指定JAR文件路径或条目主类,这与Java作业提交不同。
当通过flink run提交Python作业时,flink将运行命令" Python "。请运行以下命令以确认当前环境中的python可执行文件指向受支持的python 3.7+版本。
$ python --version# the version printed here must be 3.7+
以下命令显示了不同的PyFlink作业提交用例:
运行 PyFlink job:
$ ./bin/flink run --python examples/python/table/word_count.py
运行带有附加源文件和资源文件的PyFlink作业。在 --pyFiles 中指定的文件将被添加到PYTHONPATH中。
$ ./bin/flink run \--python examples/python/table/word_count.py \--pyFiles file:///user.txt,hdfs:///$namenode_address/username.txt
运行一个需要引用Java UDF或外部连接器的PyFlink作业。在 --jarfile 中指定的JAR文件将被上传到集群。
$ ./bin/flink run \--python examples/python/table/word_count.py \--jarfile <jarFile>
使用 --pyModule 指定主入口模块运行PyFlink作业:
$ ./bin/flink run \--pyModule word_count \--pyFiles examples/python/table
指定在主机<jobmanagerHost>上运行的特定JobManager上提交PyFlink作业
$ ./bin/flink run \--jobmanager <jobmanagerHost>:8081 \--python examples/python/table/word_count.py
运行 PyFlink job 使用 YARN 集群 Per-Job 模式:
$ ./bin/flink run \--target yarn-per-job--python examples/python/table/word_count.py
运行 PyFlink job 使用 YARN 集群 Application 模式:
$ ./bin/flink run-application -t yarn-application \-Djobmanager.memory.process.size=1024m \-Dtaskmanager.memory.process.size=1024m \-Dyarn.application.name=<ApplicationName> \-Dyarn.ship-files=/path/to/shipfiles \-pyarch shipfiles/venv.zip \-pyclientexec venv.zip/venv/bin/python3 \-pyexec venv.zip/venv/bin/python3 \-pyfs shipfiles \-pym word_count
动动小手 关注我们




