暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

技术创想 | 使用 Spring Batch + MyBatis-Spring + k8s job 完成数据清洗

领创集团Advance Group 2021-12-30
2521

本文主要介绍了在面对清洗数据需求时采用的 Spring Batch + MyBatis-Spring + k8s job 解决方案。介绍了选型思路(why),框架概述(what)并给出了一个使用demo(how)。


目录

  • 1. Why

  • 2. What

  • 2.1 概览及文档地址

  • 2.2 spring-batch

  • 2.3 MyBatis-Spring

  • 2.4 k8s job

  • 3. How

  • 3.1 需求说明

  • 3.2 方案

  • 3.3 代码

  • 3.4 执行结果演示

  • 3.5 避坑小tips


  • 4. 其他

    4.其他

Why

在实际的开发、生产过程中,我们难免会遇到因各种原因导致的数据问题。有些错误的数据是由系统bug 导致的,有一些是业务上的错误行为导致的。通常情况下越是下游的系统,越容易出现第二种情况。因此我们需要对这些脏数据或者无效数据进行清洗工作。

针对不同的场景,通常我们有以下解决办法:

  1. 对于可预见的问题,提前设计解决方案。

  2. 逻辑简单少量的数据可以直接通过sql修改。

  3. 复杂海量数据需要跑一个数据清洗的脚本。

让我们来讨论一下场景3。

在场景3中,由于我们是 java 开发者,由于 java 本身并不是一个脚本语言,python 等脚本语言掌握程度有限的情况下,一个常见的做法是:在某内网 only 的服务中添加一个后门接口,然后把清洗脚本写到这个接口内部,然后提交一次hotfix,再通过调用接口的方式执行脚本。

举个例子:当前版本的v1.0.0.0
, 然后提交一次包含数据清洗程序的 hotfixv1.0.0.1
,然后部署该版本的镜像,登陆服务器 pod 通过curl
命令执行后门接口。
然而这样的方式是值得商榷的,对于我来说它看上去不太舒服。
  • 程序本身没有为服务本身提供任何bug修复/新特性,却要重新发布一次某个服务,占用版本号
  • 清洗数据的程序可能会多服务于业务的本身造成灾难性影响,比如占满了数据库连接池或堆空间等。
  • 如果服务是多节点的,那么多个节点暴露的相同的接口可能会引起数据清洗服务的并发
  • 尽管程序只在内网中暴露,但总归还是有调用风险的
而我希望洗数据的服务
  • java编写,可以复用一些类如entity
    。降低编写脚本的复杂性,
  • 数据库操作使用框架而非原生jdbc,比如MyBatis,jpa等
  • 独立部署的服务,易于编写,且可以控制运行数量和并发
  • 脚本执行完毕后立刻成为不可用状态
  • 能够处理海量数据
经历一番探索后,我们选择了使用 spring batch
+ MyBatis-Spring
+ k8s job
的方式完成我们的需求

What

2.1 概览及文档地址

spring-batch
: 一个轻量级的、全面的批处理框架。(https://spring.io/projects/spring-batch)
MyBatis-Spring
: mybatis 是业务使用的数据库操作框架,自带 spring batch 支持。(https://mybatis.org/spring/zh/batch.html#)
k8s job
: 运行一次性任务,完成后容器就退出。(https://kubernetes.io/zh/docs/concepts/workloads/controllers/job/)
接下来的内容属于理解性内容,实际上框架提供能力远比我描述的更强大和完善,在这里我只对典型用法和设计模型加以描述,完整内容请看参考手册

2.2 spring-batch

spring-batch 作为一个批处理框架,其实把所有的批处理任务做了一种抽象。

  • 一个批处理任务就是一个job

  • 完成一个批处理任务,需要若干个阶段,比如这个阶段就是step

  • 每个阶段都可以抽象为如下步骤

    • 批处理任务中,每一个单独的元素称为 Item

    • 从某处读取一批A类型数据 : ItemReader<A>
      ,数据源可能是文件,如csv,xlxs,也可能来自某个数据库等

    •  处理读的数据A并将每个Item
      加工成要写入的数据B: ItemProcessor<A, B>

    •  将生成的数据B写入目标位置: ItemWriter<B>
      ,也许会写入文件,或者数据库等

  • 此之外,可能你job
    的执行前做一些准备工作(比如打印信息,确认数据源等),job执行后需要做一些善后工作(比如确认结果,发送邮件等,上传生成的文件等),你可以去配置JobExecutionListener
  • 海量数据处理时,我们无法一口气读完全部数据源,因此对数据源要进行分块,这个操作叫做chunk
另外3个比较重要的概念
  • JobRepository
    : 需要一个地方统一管理job,记录job的执行时间,状态,结果等,默认是数据库
  • JobLauncher
    :需要有发起调用的执行者,比如我们想让spring-boot启动就执行job,因此有CommandLineJobRunner
  • JobExplorer
    :需要查看job的执行结果
总结一下,通过 spring-batch
的抽象,我们就把执行批处理的过程解藕成了这样一个过程
  1. 创建 job,指定 job 名称
  2. 为该 job 创建 steps,指定 step 名称,分块 chunkSize大小
  3. 使用或自定义合适的 ItemReader<A>
    ItemProcessor<A,B>
    ,ItemWriter<B>
  4. 装备 ItemReader<A>
    ItemProcessor<A, B>
    ,ItemWriter<B>
    到对应的 step
  5. 装配 steps 到 job 上即可
    事情变得简单了。

2.3 MyBatis-Spring

接下来我们要面对这样一个问题:如何处理海量数据
比如我们要洗一张千万量级的表user,即数据源的输入为 select * from user
如果使用传统的写法肯定是行不通的,必然会引起oom
    @Select("Select * from user")
    List<User> findAll();
    所以在大数据量的情况下,通常写代码时我们会采用以下两种方案:

    1.使用ResultHandler<T>
    直接处理返回值代替 List<T>
    在内存中中转查询结果

    2.使用Cursor<T>
    来使用游标处理返回结果

    okay,至此我们得出的结论是,想要解决oom问题,本质上还得从上述两种方案入手。我们可以自己写一个reader,去按照上述的解决方案写mapper即可。但是这仍然不够爽,我们希望能像平时一样单纯的写sql的xml。
    MyBatis-Spring 项目察觉到了这样的需求,贴心的为我们准备好了 MyBatisCursorItemReader<T>
    至此问题再次简化。
    1. 我只需要像平时那样去xml里定义sql
    2. 把queryId配置在MyBatisCursorItemReader<T>
    至于写,其实我们没有必要太操心,因为数据经过分块后,每次都只是少量的写入,照原本的insert方式去写就好了。但你仍然可以使用 MyBatis-Spring
    提供的MyBatisBatchItemWriter
    简化写的配置。

    2.4 k8s job

    所谓job,就是在pod中启动某个服务,只启动一次,服务执行完毕后pod关闭。此pod只有两种重启策略,即 never
    onFailure

    接下来我们进行实操阶段,我也会在下面给出一个 k8s job 的 yaml 文件。

    How

    接下来我们实践一下

    3.1 需求说明

    假设有如下三张表user
    user1
    user2
      CREATE TABLE `user` (
      `id` BIGINT NOT NULL PRIMARY KEY,
      `name` VARCHAR(20) NULL,
      `age` INT NULL
      );


      CREATE TABLE `user1` (
      `id` BIGINT NOT NULL PRIMARY KEY,
      `name` VARCHAR(20) NOT NULL
      );


      CREATE TABLE `user2` (
      `id` BIGINT NOT NULL PRIMARY KEY,
      `age` INT NOT NULL
      );
      起初user
      表里只有id
      ,name
      age
      字段为null
      ,我们现在创造一个需求:
      • user1.name
        洗到user.name
      • user2.age
        洗到user.age
      (这个需求很简单,其实两句sql就可以清洗完毕,这里只是为了演示用法,可以脑补一个特别复杂的需求)

      3.2 方案

      • user1.name
        洗到user.name
        定义为step1,创建如下bean
        • ItemReader<User1> u1Reader
        • ItemProcessor<User1, User> u1Processor
        • ItemWriter<User> u1Writer
      • user2.age
        洗到`user.age定义为step2,创建如下bean
        • ItemReader<User2> u2Reader
        • ItemProcessor<User2, User> u2Processor
        • ItemWriter<User> u2Writer

      3.3 代码

      • build.gradle
        dependencies {
        implementation 'org.springframework.boot:spring-boot-starter-batch'
        implementation 'org.mybatis.spring.boot:mybatis-spring-boot-starter:2.2.0'
        runtimeOnly 'mysql:mysql-connector-java:8.0.25'
        testImplementation 'org.springframework.boot:spring-boot-starter-test'
        testImplementation 'org.springframework.batch:spring-batch-test'
        }
        • src
          包结构:
          └── src
          └── main
          ├── java
          │ └── demo
          │ └── sbm
          │ ├── SpringBatchMybatisDemoApplication.java
          │ ├── config
          │ │ ├── MybatisConfig.java
          │ │ ├── SpringBatchConfig.java
          │ │ └── SpringBatchInMemoryConfig.java
          │ ├── entity
          │ │ ├── User.java
          │ │ ├── User1.java
          │ │ └── User2.java
          │ ├── mapper
          │ │ ├── User1Mapper.java
          │ │ ├── User2Mapper.java
          │ │ └── UserMapper.java
          │ └── processor
          │ ├── U1Processor.java
          │ └── U2Processor.java
          └── resources
          ├── application.yml
          └── mapper
          ├── User1Mapper.xml
          ├── User2Mapper.xml
          └── UserMapper.xml
          • SpringBatchConfig
            类,定义job
            step
            package demo.sbm.config;


            import demo.sbm.entity.User;
            import demo.sbm.entity.User1;
            import demo.sbm.entity.User2;
            import lombok.extern.slf4j.Slf4j;
            import org.apache.ibatis.session.SqlSessionFactory;
            import org.mybatis.spring.batch.MyBatisBatchItemWriter;
            import org.mybatis.spring.batch.MyBatisCursorItemReader;
            import org.mybatis.spring.batch.builder.MyBatisBatchItemWriterBuilder;
            import org.mybatis.spring.batch.builder.MyBatisCursorItemReaderBuilder;
            import org.springframework.batch.core.*;
            import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
            import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
            import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
            import org.springframework.batch.item.ItemProcessor;
            import org.springframework.batch.item.ItemReader;
            import org.springframework.batch.item.ItemWriter;
            import org.springframework.beans.factory.annotation.Qualifier;
            import org.springframework.context.annotation.Bean;
            import org.springframework.context.annotation.Configuration;


            @Slf4j
            @Configuration
            @EnableBatchProcessing
            public class SpringBatchConfig {
            @Bean
            public Job job(JobBuilderFactory jobBuilderFactory,
            @Qualifier("step1") Step step1,
            @Qualifier("step2") Step step2,
            JobExecutionListener listener) {
            return jobBuilderFactory.get("job")
            .start(step1)
            .next(step2)
            .listener(listener)
            .build();
            }


            @Bean
            public Step step1(StepBuilderFactory stepBuilderFactory,
            @Qualifier("u1Reader") ItemReader<User1> u1Reader,
            @Qualifier("u1Processor") ItemProcessor<User1, User> u1Processor,
            @Qualifier("u1Writer") ItemWriter<User> u1Writer) {
            return stepBuilderFactory.get("step1")
            .<User1, User>chunk(500)
            .reader(u1Reader)
            .processor(u1Processor)
            .writer(u1Writer)
            .build();
            }


            @Bean
            public Step step2(StepBuilderFactory stepBuilderFactory,
            @Qualifier("u2Reader") ItemReader<User2> u2Reader,
            @Qualifier("u2Processor") ItemProcessor<User2, User> u2Processor,
            @Qualifier("u2Writer") ItemWriter<User> u2Writer) {
            return stepBuilderFactory.get("step2")
            .<User2, User>chunk(500)
            .reader(u2Reader)
            .processor(u2Processor)
            .writer(u2Writer)
            .build();
            }


            @Bean
            public MyBatisCursorItemReader<User1> u1Reader(SqlSessionFactory ssf) {
            return new MyBatisCursorItemReaderBuilder<User1>()
            .sqlSessionFactory(ssf)
            .queryId("demo.sbm.mapper.User1Mapper.findAll")
            .build();
            }


            @Bean
            public MyBatisBatchItemWriter<User> u1Writer(@Qualifier("batchSsf") SqlSessionFactory ssf) {
            return new MyBatisBatchItemWriterBuilder<User>()
            .sqlSessionFactory(ssf)
            .statementId("demo.sbm.mapper.UserMapper.updateByPrimaryKey")
            .build();
            }


            @Bean
            public MyBatisCursorItemReader<User2> u2Reader(SqlSessionFactory ssf) {
            return new MyBatisCursorItemReaderBuilder<User2>()
            .sqlSessionFactory(ssf)
            .queryId("demo.sbm.mapper.User2Mapper.findAll")
            .build();
            }


            @Bean
            public MyBatisBatchItemWriter<User> u2Writer(@Qualifier("batchSsf") SqlSessionFactory ssf) {
            return new MyBatisBatchItemWriterBuilder<User>()
            .sqlSessionFactory(ssf)
            .statementId("demo.sbm.mapper.UserMapper.updateByPrimaryKey")
            .build();
            }


            @Bean
            public JobExecutionListener listener() {
            // 实现这个接口,复写beforeJob和afterJob即可
            // 如果有复杂的实现可以单独写个bean
            return new JobExecutionListener() {
            @Override
            public void beforeJob(JobExecution jobExecution) {
            BatchStatus status = jobExecution.getStatus();
            log.info("beforeJob! status is {}", status);
            }


            @Override
            public void afterJob(JobExecution jobExecution) {
            BatchStatus status = jobExecution.getStatus();
            log.info("afterJob! status is {}", status);
            }
            };
            }
            }
            • U1Processor
              user1
              转换成user
              实现类( U2Processor 与之写法类似,略)
              package demo.sbm.processor;


              import demo.sbm.entity.User;
              import demo.sbm.entity.User1;
              import demo.sbm.mapper.UserMapper;
              import org.springframework.batch.item.ItemProcessor;
              import org.springframework.stereotype.Component;


              @Component
              public class U1Processor implements ItemProcessor<User1, User> {
              private final UserMapper userMapper;


              public U1Processor(UserMapper userMapper) {
              this.userMapper = userMapper;
              }


              @Override
              public User process(User1 u1) {
              User user = userMapper.findById(u1.getId());
              // 返回null的数据不会进入writer
              if (user == null)
              return null;
              // user.name = u1.name
              user.setName(u1.getName());
              return user;
              }
              }
              • 最后说一下k8s job的写法
                apiVersion: batch/v1
                kind: Job
                metadata:
                name: sbm
                namespace: ns-name
                spec:
                template:
                spec:
                containers:
                - name: app
                # 镜像名称
                image: demo.sbm:v0.0.1
                imagePullPolicy: IfNotPresent
                # 环境变量等其他配置,同deployment的写法,略
                env:
                - name: JAVA_OPTS
                value: -Dspring.profiles.active=dev
                - name: TZ
                value: Asia/Shanghai
                resources:
                limits:
                cpu: "1"
                memory: 2Gi
                requests:
                cpu: 300m
                memory: 1.2Gi
                imagePullSecrets:
                - name: ...
                # 重启策略: Never - 不重启 OnFailure - 失败重启
                restartPolicy: Never
                文件创建好后直接在指定集群使用k apply -f k8s-job.yaml即可创建任务

                3.4 执行结果演示

                  . ____ _ __ _ _
                  \\ ___'_ __ _ _(_)_ __ __ _ \ \ \ \
                  ( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \
                  \\/ ___)| |_)| | | | | || (_| | ) ) ) )
                  ' |____| .__|_| |_|_| |_\__, |
                  =========|_|==============|___/=/_/_/_/
                  :: Spring Boot :: (v2.6.1)


                  2021-12-21 05:06:30.798 INFO 14073 --- [ main] d.sbm.SpringBatchMybatisDemoApplication : Starting SpringBatchMybatisDemoApplication using Java 11.0.7 on zhoutanghuis-MacBook-Pro.local with PID 14073 (/Users/akira/PersonalProjects/spring-batch-mybatis-demo/build/classes/java/main started by akira in Users/akira/PersonalProjects/spring-batch-mybatis-demo)
                  2021-12-21 05:06:30.800 INFO 14073 --- [ main] d.sbm.SpringBatchMybatisDemoApplication : No active profile set, falling back to default profiles: default
                  2021-12-21 05:06:32.651 INFO 14073 --- [ main] com.zaxxer.hikari.HikariDataSource : HikariPool-1 - Starting...
                  2021-12-21 05:06:33.264 INFO 14073 --- [ main] com.zaxxer.hikari.HikariDataSource : HikariPool-1 - Start completed.
                  2021-12-21 05:06:33.381 INFO 14073 --- [ main] o.s.b.c.l.support.SimpleJobLauncher : No TaskExecutor has been set, defaulting to synchronous executor.
                  2021-12-21 05:06:33.386 WARN 14073 --- [ main] o.s.b.c.c.a.DefaultBatchConfigurer : No transaction manager was provided, using a DataSourceTransactionManager
                  2021-12-21 05:06:33.389 INFO 14073 --- [ main] o.s.b.c.r.s.JobRepositoryFactoryBean : No database type set, using meta data indicating: MYSQL
                  2021-12-21 05:06:33.509 INFO 14073 --- [ main] o.s.b.c.l.support.SimpleJobLauncher : No TaskExecutor has been set, defaulting to synchronous executor.
                  2021-12-21 05:06:33.742 INFO 14073 --- [ main] d.sbm.SpringBatchMybatisDemoApplication : Started SpringBatchMybatisDemoApplication in 3.234 seconds (JVM running for 3.981)
                  2021-12-21 05:06:33.745 INFO 14073 --- [ main] o.s.b.a.b.JobLauncherApplicationRunner : Running default command line with: []
                  2021-12-21 05:06:33.793 INFO 14073 --- [ main] o.s.b.c.l.support.SimpleJobLauncher : Job: [SimpleJob: [name=job]] launched with the following parameters: [{}]
                  2021-12-21 05:06:33.815 INFO 14073 --- [ main] demo.sbm.config.SpringBatchConfig : beforeJob! status is STARTED
                  2021-12-21 05:06:33.822 INFO 14073 --- [ main] o.s.batch.core.job.SimpleStepHandler : Executing step: [step1]
                  2021-12-21 05:29:07.195 INFO 14073 --- [ main] o.s.batch.core.step.AbstractStep : Step: [step1] executed in 22m33s373ms
                  2021-12-21 05:29:07.198 INFO 14073 --- [ main] o.s.batch.core.job.SimpleStepHandler : Executing step: [step2]
                  2021-12-21 05:50:53.423 INFO 14073 --- [ main] o.s.batch.core.step.AbstractStep : Step: [step2] executed in 21m46s225ms
                  2021-12-21 05:50:53.424 INFO 14073 --- [ main] demo.sbm.config.SpringBatchConfig : afterJob! status is COMPLETED
                  2021-12-21 05:50:53.425 INFO 14073 --- [ main] o.s.b.c.l.support.SimpleJobLauncher : Job: [SimpleJob: [name=job]] completed with the following parameters: [{}] and the following status: [COMPLETED] in 44m19s614ms
                  2021-12-21 05:50:53.429 INFO 14073 --- [ionShutdownHook] com.zaxxer.hikari.HikariDataSource : HikariPool-1 - Shutdown initiated...
                  2021-12-21 05:50:53.434 INFO 14073 --- [ionShutdownHook] com.zaxxer.hikari.HikariDataSource : HikariPool-1 - Shutdown completed.


                  Process finished with exit code 0

                  3.5 避坑小tips

                  • 最后说明一下 mybatis + batch 的一个小坑,这几乎是百分之百踩到的。

                  3.5.1如果不对数据源做任何配置,那基本在执行阶段会报如下错误Cannot change the ExecutorType when there is an existing transaction

                  TL,DR: mybatis增加如下配置
                    package demo.sbm.config;


                    import org.apache.ibatis.session.SqlSessionFactory;
                    import org.mybatis.spring.SqlSessionFactoryBean;
                    import org.mybatis.spring.boot.autoconfigure.SpringBootVFS;
                    import org.springframework.context.annotation.Bean;
                    import org.springframework.context.annotation.Configuration;
                    import org.springframework.context.annotation.Primary;
                    import org.springframework.core.io.support.PathMatchingResourcePatternResolver;


                    import javax.sql.DataSource;


                    /**
                    * Mybatis 配置多个 SqlSessionFactory
                    *
                    * @author Victor Zhou
                    */
                    @Configuration
                    public class MybatisConfig {


                    @Primary
                    @Bean(name = "ssf")
                    public SqlSessionFactory settlementSqlSessionFactory(DataSource ds) throws Exception {
                    return getSqlSessionFactory(ds);
                    }




                    @Bean(name = "batchSsf")
                    public SqlSessionFactory priceEngineSqlSessionFactory(DataSource ds) throws Exception {
                    return getSqlSessionFactory(ds);
                    }


                    private SqlSessionFactory getSqlSessionFactory(DataSource ds) throws Exception {
                    SqlSessionFactoryBean factory = new SqlSessionFactoryBean();
                    factory.setDataSource(ds);
                    factory.setVfs(SpringBootVFS.class);
                    factory.setMapperLocations(new PathMatchingResourcePatternResolver().getResources("classpath*:mapper/*.xml"));
                    org.apache.ibatis.session.Configuration configuration = new org.apache.ibatis.session.Configuration();
                    configuration.setMapUnderscoreToCamelCase(true);
                    factory.setConfiguration(configuration);
                    return factory.getObject();
                    }
                    }
                    然后配置MyBatisBatchItemWriter
                    的时候需要指定sqlSessionFactory
                      @Bean
                      public MyBatisBatchItemWriter<User> u2Writer(@Qualifier("batchSsf") SqlSessionFactory ssf) {
                      return new MyBatisBatchItemWriterBuilder<User>()
                      .sqlSessionFactory(ssf)
                      .statementId("demo.sbm.mapper.UserMapper.updateByPrimaryKey")
                      .build();
                      }
                      原因是sqlSessionTemplate有一个属性ExecutorType
                      ,该值在整个sqlSessionTemplate
                      使用期间无法更改。默认情况下,在创建时ExecutorType是SIMPLE
                      MyBatisBatchItemWriter
                      使用的是BATCH
                      我尝试去创建多个sqlSessionTemplate指定给writer,然而仍然无法解决问题。发现只要是公用同一sqlSessionFactory
                      ,当前存在事务执行时,多个sst的ExecutorType是不能切换的。
                      所以解决办法只能创建多个SqlSessionFactory
                      ,即上文tl dr的内容

                      3.5.2 默认的spring-batch
                      是使用数据库做 jobRepository
                      的,本例实用了内存jobRepository
                      代替数据库

                      相关配置在SpringBatchInMemoryConfig
                      这个类中
                        package demo.sbm.config;


                        import lombok.extern.slf4j.Slf4j;
                        import org.springframework.batch.core.configuration.annotation.DefaultBatchConfigurer;
                        import org.springframework.batch.core.explore.JobExplorer;
                        import org.springframework.batch.core.explore.support.MapJobExplorerFactoryBean;
                        import org.springframework.batch.core.launch.JobLauncher;
                        import org.springframework.batch.core.launch.support.SimpleJobLauncher;
                        import org.springframework.batch.core.repository.JobRepository;
                        import org.springframework.batch.core.repository.support.MapJobRepositoryFactoryBean;
                        import org.springframework.context.annotation.Bean;
                        import org.springframework.context.annotation.Configuration;


                        import java.util.Objects;


                        @Configuration
                        @Slf4j
                        public class SpringBatchInMemoryConfig {
                        @Bean
                        public DefaultBatchConfigurer batchConfigurer() {
                        return new DefaultBatchConfigurer() {


                        private final JobRepository jobRepository;
                        private final JobExplorer jobExplorer;
                        private final JobLauncher jobLauncher;


                        {


                        MapJobRepositoryFactoryBean jobRepositoryFactory = new MapJobRepositoryFactoryBean(); //NOSONAR
                        try {
                        this.jobRepository = jobRepositoryFactory.getObject();
                        MapJobExplorerFactoryBean jobExplorerFactory = new MapJobExplorerFactoryBean(jobRepositoryFactory); //NOSONAR
                        this.jobExplorer = jobExplorerFactory.getObject();
                        SimpleJobLauncher simpleJobLauncher = new SimpleJobLauncher();
                        simpleJobLauncher.setJobRepository(Objects.requireNonNull(jobRepository));
                        simpleJobLauncher.afterPropertiesSet();
                        this.jobLauncher = simpleJobLauncher;
                        } catch (Exception e) {
                        log.error("Some exception occurred when initializing batch", e);
                        throw new IllegalStateException();
                        }
                        }


                        @Override
                        public JobRepository getJobRepository() {
                        return jobRepository;
                        }


                        @Override
                        public JobExplorer getJobExplorer() {
                        return jobExplorer;
                        }


                        @Override
                        public JobLauncher getJobLauncher() {
                        return jobLauncher;
                        }
                        };
                        }
                        }

                        其他

                        • k8s中,可以认为job
                          + cron
                          就是cronJob
                          使用同样的方式可以轻松定义一个cronJob完成每日的跑批任务。这种方式比传统的常驻式java程序+分布式定时任务框架更节省资源(使用时创建,完成时销毁)。
                        • 实际上spring batch能处理更复杂更抽象的逻辑,比如flow,tasklet,更高的性能,比如并发执行,在本文均未涉及。
                        • 本文演示用代码开源地址为: https://github.com/CoderVictor6/sbm

                        关于领创集团
                        (Advance Intelligence Group)
                        领创集团成立于2016年,致力于通过科技创新的本地化应用,改造和重塑金融和零售行业,以多元化的业务布局打造一个服务于消费者、企业和商户的生态圈。集团旗下包含企业业务和消费者业务两大板块,企业业务包含ADVANCE.AI和Ginee,分别为银行、金融、金融科技、零售和电商行业客户提供基于AI技术的数字身份验证、风险管理产品和全渠道电商服务解决方案;消费者业务Atome Financial包括亚洲最大的先享后付平台Atome和数字信贷服务。今年9月,领创集团宣布完成超4亿美元D轮融资,融资完成后领创集团估值已超20亿美元,成为新加坡最大的独立科技创业公司之一。

                        往期回顾
                        BREAK AWAY
                        技术创想 | Python 事件循环及协程原理

                        技术创想 | GitOps 系列之 Flux 实践篇


                        建议收藏 | Feign RequestInterceptor 使用简介



                        文章转载自领创集团Advance Group,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

                        评论