暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

Spring Cloud Data Flow 集成 Spark 数据实战

程序员Socket 2019-09-12
220

点击蓝色“泥瓦匠BYSocket”,关注我哟

后台回复“888”,领取资源哦

1.简介

SpringCloudDataFlow
是用于构建数据集成和实时数据处理管道的工具包。在这种情况下,管道(Pipelines)是使用 SpringCloudStream
SpringCloudTask
框架构建的 SpringBoot
应用程序。

在本教程中,我们将展示如何将 SpringCloudDataFlow
ApacheSpark
一起使用。

2.本地数据流服务

首先,我们需要运行数据流服务器(Data Flow Server)才能部署我们的作业( jobs
)。要在本地运行数据流服务器,需要使用 spring-cloud-starter-dataflow-server-local
依赖创建一个新项目:

  1. <dependency>

  2. <groupId>org.springframework.cloud</groupId>

  3. <artifactId>spring-cloud-starter-dataflow-server-local</artifactId>

  4. <version>1.7.4.RELEASE</version>

  5. </dependency>

之后,使用 @EnableDataFlowServer
来注解服务中的主类(main class):

  1. @EnableDataFlowServer

  2. @SpringBootApplication

  3. public class SpringDataFlowServerApplication {


  4. public static void main(String[] args) {

  5. SpringApplication.run(

  6. SpringDataFlowServerApplication.class, args);

  7. }

  8. }

运行此应用程序后,本地数据流服务运行在端口 9393

3.新建工程

我们将 SparkJob
作为本地单体应用程序创建,这样我们就不需要任何集群来运行它。

3.1依赖

首先,添加 Spark
依赖

  1. <dependency>

  2. <groupId>org.apache.spark</groupId>

  3. <artifactId>spark-core_2.10</artifactId>

  4. <version>2.4.0</version>

  5. </dependency>

3.2 创建job

job
来说,就是为了求 pi
的近似值:

  1. public class PiApproximation {

  2. public static void main(String[] args) {

  3. SparkConf conf = new SparkConf().setAppName("BaeldungPIApproximation");

  4. JavaSparkContext context = new JavaSparkContext(conf);

  5. int slices = args.length >= 1 ? Integer.valueOf(args[0]) : 2;

  6. int n = (100000L * slices) > Integer.MAX_VALUE ? Integer.MAX_VALUE : 100000 * slices;


  7. List<Integer> xs = IntStream.rangeClosed(0, n)

  8. .mapToObj(element -> Integer.valueOf(element))

  9. .collect(Collectors.toList());


  10. JavaRDD<Integer> dataSet = context.parallelize(xs, slices);


  11. JavaRDD<Integer> pointsInsideTheCircle = dataSet.map(integer -> {

  12. double x = Math.random() * 2 - 1;

  13. double y = Math.random() * 2 - 1;

  14. return (x * x + y * y ) < 1 ? 1: 0;

  15. });


  16. int count = pointsInsideTheCircle.reduce((integer, integer2) -> integer + integer2);


  17. System.out.println("The pi was estimated as:" + count / n);


  18. context.stop();

  19. }

  20. }

4. Data Flow Shell

DataFlowShell
是一个允许我们与服务器交互的应用程序。Shell
使用 DSL
命令来描述数据流。

要使用 DataFlowShell
,我们要创建一个运行它的项目。首先,需要 spring-cloud-dataflow-shell
依赖:

  1. <dependency>

  2. <groupId>org.springframework.cloud</groupId>

  3. <artifactId>spring-cloud-dataflow-shell</artifactId>

  4. <version>1.7.4.RELEASE</version>

  5. </dependency>

添加依赖项后,我们可以创建主类来运行 DataFlowShell

  1. @EnableDataFlowShell

  2. @SpringBootApplication

  3. public class SpringDataFlowShellApplication {


  4. public static void main(String[] args) {

  5. SpringApplication.run(SpringDataFlowShellApplication.class, args);

  6. }

  7. }

5.部署项目

为了部署我们的项目,可在三个版本( cluster
yarn
client
)中使用 ApacheSpark
所谓 任务运行器(task runner)
—— 我们将使用 client
版本。任务运行器(task runner)
是真正运行 Sparkjob
的实例。为此,我们首先需要使用 DataFlowShell
注册 task

  1. app register --type task --name spark-client --uri

  2. maven://org.springframework.cloud.task.app:spark-client-task:1.0.0.BUILD-SNAPSHOT

task
允许我们指定多个不同的参数,其中一些参数是可选的,但是一些参数是正确部署 Sparkjob
所必需的:

  • spark.app-class,已提交 job
    的主类

  • spark.app-jar,包含 job
    的 fat-jar
    路径

  • spark.app-name, job
    的名称

  • spark.app-args,将传递给 job
    的参数 我们可以使用注册的任务 spark-client
    提交我们的工作,记住提供所需的参数:

  1. task create spark1 --definition "spark-client \

  2. --spark.app-name=my-test-pi --spark.app-class=com.baeldung.spring.cloud.PiApproximation \

  3. --spark.app-jar=/apache-spark-job-0.0.1-SNAPSHOT.jar --spark.app-args=10"

请注意, spark.app-jar
是我们 job
fat-jar
的路径。成功创建任务后,我们可以使用以下命令继续运行它:

  1. task launch spark1

这将调用 task
的执行。

6.总结

在本教程中,我们展示了如何使用 SpringCloudDataFlow
框架来处理 ApacheSpark
数据。有关 SpringCloudDataFlow
框架的更多信息,请参阅文档。

所有代码示例都可以在 GitHub
上找到。

END




长按二维码,扫扫关注哦

关注即可得 Spring Boot Cloud、微服务等干货


文章转载自程序员Socket,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论