本文主要介绍了在面对清洗数据需求时采用的 Spring Batch + MyBatis-Spring + k8s job 解决方案。介绍了选型思路(why),框架概述(what)并给出了一个使用demo(how)。
目录
Why
针对不同的场景,通常我们有以下解决办法:
对于可预见的问题,提前设计解决方案。
逻辑简单且少量的数据可以直接通过sql修改。
复杂或海量数据需要跑一个数据清洗的脚本。
让我们来讨论一下场景3。
在场景3中,由于我们是 java 开发者,由于 java 本身并不是一个脚本语言,python 等脚本语言掌握程度有限的情况下,一个常见的做法是:在某内网 only 的服务中添加一个后门接口,然后把清洗脚本写到这个接口内部,然后提交一次hotfix,再通过调用接口的方式执行脚本。
v1.0.0.0, 然后提交一次包含数据清洗程序的 hotfix
v1.0.0.1,然后部署该版本的镜像,登陆服务器 pod 通过
curl命令执行后门接口。
程序本身没有为服务本身提供任何bug修复/新特性,却要重新发布一次某个服务,占用版本号
清洗数据的程序可能会多服务于业务的本身造成灾难性影响,比如占满了数据库连接池或堆空间等。
如果服务是多节点的,那么多个节点暴露的相同的接口可能会引起数据清洗服务的并发
尽管程序只在内网中暴露,但总归还是有调用风险的
java编写,可以复用一些类如 entity
。降低编写脚本的复杂性,
数据库操作使用框架而非原生jdbc,比如MyBatis,jpa等
独立部署的服务,易于编写,且可以控制运行数量和并发
脚本执行完毕后立刻成为不可用状态
能够处理海量数据
spring batch+
MyBatis-Spring+
k8s job的方式完成我们的需求

What
2.1 概览及文档地址
spring-batch: 一个轻量级的、全面的批处理框架。(https://spring.io/projects/spring-batch)
MyBatis-Spring: mybatis 是业务使用的数据库操作框架,自带 spring batch 支持。(https://mybatis.org/spring/zh/batch.html#)
k8s job: 运行一次性任务,完成后容器就退出。(https://kubernetes.io/zh/docs/concepts/workloads/controllers/job/)
2.2 spring-batch
spring-batch 作为一个批处理框架,其实把所有的批处理任务做了一种抽象。
一个批处理任务就是一个
job完成一个批处理任务,需要若干个阶段,比如这个阶段就是
step每个阶段都可以抽象为如下步骤
批处理任务中,每一个单独的元素称为
Item从某处读取一批A类型数据 :
ItemReader<A>
,数据源可能是文件,如csv,xlxs,也可能来自某个数据库等处理读的数据A并将每个
Item
加工成要写入的数据B:ItemProcessor<A, B>将生成的数据B写入目标位置:
ItemWriter<B>
,也许会写入文件,或者数据库等除此之外,可能你在 job
的执行前做一些准备工作(比如打印信息,确认数据源等),job执行后需要做一些善后工作(比如确认结果,发送邮件等,上传生成的文件等),你可以去配置JobExecutionListener海量数据处理时,我们无法一口气读完全部数据源,因此对数据源要进行分块,这个操作叫做 chunk
JobRepository
: 需要一个地方统一管理job,记录job的执行时间,状态,结果等,默认是数据库
JobLauncher
:需要有发起调用的执行者,比如我们想让spring-boot启动就执行job,因此有CommandLineJobRunner
JobExplorer
:需要查看job的执行结果
spring-batch的抽象,我们就把执行批处理的过程解藕成了这样一个过程
创建 job,指定 job 名称 为该 job 创建 steps,指定 step 名称,分块 chunkSize大小 使用或自定义合适的 ItemReader<A>ItemProcessor<A,B>
,ItemWriter<B>装备 ItemReader<A>
,ItemProcessor<A, B>
,ItemWriter<B>
到对应的 step装配 steps 到 job 上即可 事情变得简单了。
2.3 MyBatis-Spring
select * from user@Select("Select * from user")List<User> findAll();
1.使用ResultHandler<T>
直接处理返回值代替 List<T>
在内存中中转查询结果
2.使用Cursor<T>
来使用游标处理返回结果
MyBatisCursorItemReader<T>。至此问题再次简化。
我只需要像平时那样去xml里定义sql 把queryId配置在 MyBatisCursorItemReader<T>
MyBatis-Spring提供的
MyBatisBatchItemWriter简化写的配置。
2.4 k8s job
所谓job,就是在pod中启动某个服务,只启动一次,服务执行完毕后pod关闭。此pod只有两种重启策略,即 never
和 onFailure
接下来我们进行实操阶段,我也会在下面给出一个 k8s job 的 yaml 文件。
How
3.1 需求说明
useruser1和
user2。
CREATE TABLE `user` (`id` BIGINT NOT NULL PRIMARY KEY,`name` VARCHAR(20) NULL,`age` INT NULL);CREATE TABLE `user1` (`id` BIGINT NOT NULL PRIMARY KEY,`name` VARCHAR(20) NOT NULL);CREATE TABLE `user2` (`id` BIGINT NOT NULL PRIMARY KEY,`age` INT NOT NULL);
user表里只有
id,
name和
age字段为
null,我们现在创造一个需求:
将 user1.name
洗到user.name
将 user2.age
洗到user.age
3.2 方案
user1.name
洗到user.name
定义为step1,创建如下beanItemReader<User1> u1ReaderItemProcessor<User1, User> u1ProcessorItemWriter<User> u1Writer将 user2.age
洗到`user.age定义为step2,创建如下beanItemReader<User2> u2ReaderItemProcessor<User2, User> u2ProcessorItemWriter<User> u2Writer
3.3 代码
build.gradle
dependencies {implementation 'org.springframework.boot:spring-boot-starter-batch'implementation 'org.mybatis.spring.boot:mybatis-spring-boot-starter:2.2.0'runtimeOnly 'mysql:mysql-connector-java:8.0.25'testImplementation 'org.springframework.boot:spring-boot-starter-test'testImplementation 'org.springframework.batch:spring-batch-test'}
src
包结构:
└── src└── main├── java│ └── demo│ └── sbm│ ├── SpringBatchMybatisDemoApplication.java│ ├── config│ │ ├── MybatisConfig.java│ │ ├── SpringBatchConfig.java│ │ └── SpringBatchInMemoryConfig.java│ ├── entity│ │ ├── User.java│ │ ├── User1.java│ │ └── User2.java│ ├── mapper│ │ ├── User1Mapper.java│ │ ├── User2Mapper.java│ │ └── UserMapper.java│ └── processor│ ├── U1Processor.java│ └── U2Processor.java└── resources├── application.yml└── mapper├── User1Mapper.xml├── User2Mapper.xml└── UserMapper.xml
SpringBatchConfig
类,定义job
与step
package demo.sbm.config;import demo.sbm.entity.User;import demo.sbm.entity.User1;import demo.sbm.entity.User2;import lombok.extern.slf4j.Slf4j;import org.apache.ibatis.session.SqlSessionFactory;import org.mybatis.spring.batch.MyBatisBatchItemWriter;import org.mybatis.spring.batch.MyBatisCursorItemReader;import org.mybatis.spring.batch.builder.MyBatisBatchItemWriterBuilder;import org.mybatis.spring.batch.builder.MyBatisCursorItemReaderBuilder;import org.springframework.batch.core.*;import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;import org.springframework.batch.item.ItemProcessor;import org.springframework.batch.item.ItemReader;import org.springframework.batch.item.ItemWriter;import org.springframework.beans.factory.annotation.Qualifier;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;@Slf4j@Configuration@EnableBatchProcessingpublic class SpringBatchConfig {@Beanpublic Job job(JobBuilderFactory jobBuilderFactory,@Qualifier("step1") Step step1,@Qualifier("step2") Step step2,JobExecutionListener listener) {return jobBuilderFactory.get("job").start(step1).next(step2).listener(listener).build();}@Beanpublic Step step1(StepBuilderFactory stepBuilderFactory,@Qualifier("u1Reader") ItemReader<User1> u1Reader,@Qualifier("u1Processor") ItemProcessor<User1, User> u1Processor,@Qualifier("u1Writer") ItemWriter<User> u1Writer) {return stepBuilderFactory.get("step1").<User1, User>chunk(500).reader(u1Reader).processor(u1Processor).writer(u1Writer).build();}@Beanpublic Step step2(StepBuilderFactory stepBuilderFactory,@Qualifier("u2Reader") ItemReader<User2> u2Reader,@Qualifier("u2Processor") ItemProcessor<User2, User> u2Processor,@Qualifier("u2Writer") ItemWriter<User> u2Writer) {return stepBuilderFactory.get("step2").<User2, User>chunk(500).reader(u2Reader).processor(u2Processor).writer(u2Writer).build();}@Beanpublic MyBatisCursorItemReader<User1> u1Reader(SqlSessionFactory ssf) {return new MyBatisCursorItemReaderBuilder<User1>().sqlSessionFactory(ssf).queryId("demo.sbm.mapper.User1Mapper.findAll").build();}@Beanpublic MyBatisBatchItemWriter<User> u1Writer(@Qualifier("batchSsf") SqlSessionFactory ssf) {return new MyBatisBatchItemWriterBuilder<User>().sqlSessionFactory(ssf).statementId("demo.sbm.mapper.UserMapper.updateByPrimaryKey").build();}@Beanpublic MyBatisCursorItemReader<User2> u2Reader(SqlSessionFactory ssf) {return new MyBatisCursorItemReaderBuilder<User2>().sqlSessionFactory(ssf).queryId("demo.sbm.mapper.User2Mapper.findAll").build();}@Beanpublic MyBatisBatchItemWriter<User> u2Writer(@Qualifier("batchSsf") SqlSessionFactory ssf) {return new MyBatisBatchItemWriterBuilder<User>().sqlSessionFactory(ssf).statementId("demo.sbm.mapper.UserMapper.updateByPrimaryKey").build();}@Beanpublic JobExecutionListener listener() {// 实现这个接口,复写beforeJob和afterJob即可// 如果有复杂的实现可以单独写个beanreturn new JobExecutionListener() {@Overridepublic void beforeJob(JobExecution jobExecution) {BatchStatus status = jobExecution.getStatus();log.info("beforeJob! status is {}", status);}@Overridepublic void afterJob(JobExecution jobExecution) {BatchStatus status = jobExecution.getStatus();log.info("afterJob! status is {}", status);}};}}
U1Processor
user1
转换成user
的实现类( U2Processor 与之写法类似,略)
package demo.sbm.processor;import demo.sbm.entity.User;import demo.sbm.entity.User1;import demo.sbm.mapper.UserMapper;import org.springframework.batch.item.ItemProcessor;import org.springframework.stereotype.Component;@Componentpublic class U1Processor implements ItemProcessor<User1, User> {private final UserMapper userMapper;public U1Processor(UserMapper userMapper) {this.userMapper = userMapper;}@Overridepublic User process(User1 u1) {User user = userMapper.findById(u1.getId());// 返回null的数据不会进入writerif (user == null)return null;// user.name = u1.nameuser.setName(u1.getName());return user;}}
最后说一下k8s job的写法
apiVersion: batch/v1kind: Jobmetadata:name: sbmnamespace: ns-namespec:template:spec:containers:- name: app# 镜像名称image: demo.sbm:v0.0.1imagePullPolicy: IfNotPresent# 环境变量等其他配置,同deployment的写法,略env:- name: JAVA_OPTSvalue: -Dspring.profiles.active=dev- name: TZvalue: Asia/Shanghairesources:limits:cpu: "1"memory: 2Girequests:cpu: 300mmemory: 1.2GiimagePullSecrets:- name: ...# 重启策略: Never - 不重启 OnFailure - 失败重启restartPolicy: Never
3.4 执行结果演示
. ____ _ __ _ _\\ ___'_ __ _ _(_)_ __ __ _ \ \ \ \( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \\\/ ___)| |_)| | | | | || (_| | ) ) ) )' |____| .__|_| |_|_| |_\__, |=========|_|==============|___/=/_/_/_/:: Spring Boot :: (v2.6.1)2021-12-21 05:06:30.798 INFO 14073 --- [ main] d.sbm.SpringBatchMybatisDemoApplication : Starting SpringBatchMybatisDemoApplication using Java 11.0.7 on zhoutanghuis-MacBook-Pro.local with PID 14073 (/Users/akira/PersonalProjects/spring-batch-mybatis-demo/build/classes/java/main started by akira in Users/akira/PersonalProjects/spring-batch-mybatis-demo)2021-12-21 05:06:30.800 INFO 14073 --- [ main] d.sbm.SpringBatchMybatisDemoApplication : No active profile set, falling back to default profiles: default2021-12-21 05:06:32.651 INFO 14073 --- [ main] com.zaxxer.hikari.HikariDataSource : HikariPool-1 - Starting...2021-12-21 05:06:33.264 INFO 14073 --- [ main] com.zaxxer.hikari.HikariDataSource : HikariPool-1 - Start completed.2021-12-21 05:06:33.381 INFO 14073 --- [ main] o.s.b.c.l.support.SimpleJobLauncher : No TaskExecutor has been set, defaulting to synchronous executor.2021-12-21 05:06:33.386 WARN 14073 --- [ main] o.s.b.c.c.a.DefaultBatchConfigurer : No transaction manager was provided, using a DataSourceTransactionManager2021-12-21 05:06:33.389 INFO 14073 --- [ main] o.s.b.c.r.s.JobRepositoryFactoryBean : No database type set, using meta data indicating: MYSQL2021-12-21 05:06:33.509 INFO 14073 --- [ main] o.s.b.c.l.support.SimpleJobLauncher : No TaskExecutor has been set, defaulting to synchronous executor.2021-12-21 05:06:33.742 INFO 14073 --- [ main] d.sbm.SpringBatchMybatisDemoApplication : Started SpringBatchMybatisDemoApplication in 3.234 seconds (JVM running for 3.981)2021-12-21 05:06:33.745 INFO 14073 --- [ main] o.s.b.a.b.JobLauncherApplicationRunner : Running default command line with: []2021-12-21 05:06:33.793 INFO 14073 --- [ main] o.s.b.c.l.support.SimpleJobLauncher : Job: [SimpleJob: [name=job]] launched with the following parameters: [{}]2021-12-21 05:06:33.815 INFO 14073 --- [ main] demo.sbm.config.SpringBatchConfig : beforeJob! status is STARTED2021-12-21 05:06:33.822 INFO 14073 --- [ main] o.s.batch.core.job.SimpleStepHandler : Executing step: [step1]2021-12-21 05:29:07.195 INFO 14073 --- [ main] o.s.batch.core.step.AbstractStep : Step: [step1] executed in 22m33s373ms2021-12-21 05:29:07.198 INFO 14073 --- [ main] o.s.batch.core.job.SimpleStepHandler : Executing step: [step2]2021-12-21 05:50:53.423 INFO 14073 --- [ main] o.s.batch.core.step.AbstractStep : Step: [step2] executed in 21m46s225ms2021-12-21 05:50:53.424 INFO 14073 --- [ main] demo.sbm.config.SpringBatchConfig : afterJob! status is COMPLETED2021-12-21 05:50:53.425 INFO 14073 --- [ main] o.s.b.c.l.support.SimpleJobLauncher : Job: [SimpleJob: [name=job]] completed with the following parameters: [{}] and the following status: [COMPLETED] in 44m19s614ms2021-12-21 05:50:53.429 INFO 14073 --- [ionShutdownHook] com.zaxxer.hikari.HikariDataSource : HikariPool-1 - Shutdown initiated...2021-12-21 05:50:53.434 INFO 14073 --- [ionShutdownHook] com.zaxxer.hikari.HikariDataSource : HikariPool-1 - Shutdown completed.Process finished with exit code 0
3.5 避坑小tips
最后说明一下 mybatis + batch 的一个小坑,这几乎是百分之百踩到的。
3.5.1如果不对数据源做任何配置,那基本在执行阶段会报如下错误:Cannot change the ExecutorType when there is an existing transaction
。
package demo.sbm.config;import org.apache.ibatis.session.SqlSessionFactory;import org.mybatis.spring.SqlSessionFactoryBean;import org.mybatis.spring.boot.autoconfigure.SpringBootVFS;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;import org.springframework.context.annotation.Primary;import org.springframework.core.io.support.PathMatchingResourcePatternResolver;import javax.sql.DataSource;/*** Mybatis 配置多个 SqlSessionFactory** @author Victor Zhou*/@Configurationpublic class MybatisConfig {@Primary@Bean(name = "ssf")public SqlSessionFactory settlementSqlSessionFactory(DataSource ds) throws Exception {return getSqlSessionFactory(ds);}@Bean(name = "batchSsf")public SqlSessionFactory priceEngineSqlSessionFactory(DataSource ds) throws Exception {return getSqlSessionFactory(ds);}private SqlSessionFactory getSqlSessionFactory(DataSource ds) throws Exception {SqlSessionFactoryBean factory = new SqlSessionFactoryBean();factory.setDataSource(ds);factory.setVfs(SpringBootVFS.class);factory.setMapperLocations(new PathMatchingResourcePatternResolver().getResources("classpath*:mapper/*.xml"));org.apache.ibatis.session.Configuration configuration = new org.apache.ibatis.session.Configuration();configuration.setMapUnderscoreToCamelCase(true);factory.setConfiguration(configuration);return factory.getObject();}}
MyBatisBatchItemWriter的时候需要指定sqlSessionFactory
@Beanpublic MyBatisBatchItemWriter<User> u2Writer(@Qualifier("batchSsf") SqlSessionFactory ssf) {return new MyBatisBatchItemWriterBuilder<User>().sqlSessionFactory(ssf).statementId("demo.sbm.mapper.UserMapper.updateByPrimaryKey").build();}
ExecutorType,该值在整个
sqlSessionTemplate使用期间无法更改。默认情况下,在创建时ExecutorType是
SIMPLE,
MyBatisBatchItemWriter使用的是
BATCH。
sqlSessionFactory,当前存在事务执行时,多个sst的ExecutorType是不能切换的。
SqlSessionFactory,即上文tl dr的内容
3.5.2 默认的spring-batch
是使用数据库做 jobRepository
的,本例实用了内存jobRepository
代替数据库
SpringBatchInMemoryConfig这个类中
package demo.sbm.config;import lombok.extern.slf4j.Slf4j;import org.springframework.batch.core.configuration.annotation.DefaultBatchConfigurer;import org.springframework.batch.core.explore.JobExplorer;import org.springframework.batch.core.explore.support.MapJobExplorerFactoryBean;import org.springframework.batch.core.launch.JobLauncher;import org.springframework.batch.core.launch.support.SimpleJobLauncher;import org.springframework.batch.core.repository.JobRepository;import org.springframework.batch.core.repository.support.MapJobRepositoryFactoryBean;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;import java.util.Objects;@Configuration@Slf4jpublic class SpringBatchInMemoryConfig {@Beanpublic DefaultBatchConfigurer batchConfigurer() {return new DefaultBatchConfigurer() {private final JobRepository jobRepository;private final JobExplorer jobExplorer;private final JobLauncher jobLauncher;{MapJobRepositoryFactoryBean jobRepositoryFactory = new MapJobRepositoryFactoryBean(); //NOSONARtry {this.jobRepository = jobRepositoryFactory.getObject();MapJobExplorerFactoryBean jobExplorerFactory = new MapJobExplorerFactoryBean(jobRepositoryFactory); //NOSONARthis.jobExplorer = jobExplorerFactory.getObject();SimpleJobLauncher simpleJobLauncher = new SimpleJobLauncher();simpleJobLauncher.setJobRepository(Objects.requireNonNull(jobRepository));simpleJobLauncher.afterPropertiesSet();this.jobLauncher = simpleJobLauncher;} catch (Exception e) {log.error("Some exception occurred when initializing batch", e);throw new IllegalStateException();}}@Overridepublic JobRepository getJobRepository() {return jobRepository;}@Overridepublic JobExplorer getJobExplorer() {return jobExplorer;}@Overridepublic JobLauncher getJobLauncher() {return jobLauncher;}};}}
其他
k8s中,可以认为 job
+cron
就是cronJob
。使用同样的方式可以轻松定义一个cronJob完成每日的跑批任务。这种方式比传统的常驻式java程序+分布式定时任务框架更节省资源(使用时创建,完成时销毁)。
实际上spring batch能处理更复杂更抽象的逻辑,比如flow,tasklet,更高的性能,比如并发执行,在本文均未涉及。
本文演示用代码开源地址为: https://github.com/CoderVictor6/sbm


文章转载自领创集团Advance Group,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。








