

//定义一个作业类,实现用户的业务逻辑public class HelloJob implements Job {......实现业务逻辑}//根据作业类得到JobDetailJobDetail jobDetail = JobBuilder.newJob(HelloJob.class)//定义一个触发器,按照规定的时间调度作业Trigger trigger = TriggerBuilder.newTrigger("每隔1分钟执行一次")//根据作业类和触发器创建调度器Scheduler scheduler = scheduler.scheduleJob(jobDetail,trigger);//启动调度器,开始执行任务scheduler .start()
public class MyElasticJob implements SimpleJob {public void execute(ShardingContext context) {//实现业务逻辑......}// 对zookeeper进行设置,作为分布式任务的注册中心private static CoordinatorRegistryCenter createRegistryCenter() {CoordinatorRegistryCenter regCenter = new ZookeeperRegistryCenter(new ZookeeperConfiguration("xxxx"));regCenter.init();return regCenter;}//设置任务的执行频率、执行的类private static LiteJobConfiguration createJobConfiguration() {JobCoreConfiguration simpleCoreConfig = JobCoreConfiguration.newBuilder("demoSimpleJob", "0/15 * * * * ?", 10).build();// 定义SIMPLE类型配置SimpleJobConfiguration simpleJobConfig = new SimpleJobConfiguration(simpleCoreConfig, MyElasticJob.class.getCanonicalName());// 定义Lite作业根配置LiteJobConfiguration simpleJobRootConfig = LiteJobConfiguration.newBuilder(simpleJobConfig).build();return simpleJobRootConfig;}//主函数public static void main(String[] args) {new JobScheduler(createRegistryCenter(), createJobConfiguration()).init();}}
public class JobScheduler {public static final String ELASTIC_JOB_DATA_MAP_KEY = "elasticJob";private static final String JOB_FACADE_DATA_MAP_KEY = "jobFacade";//作业配置private final LiteJobConfiguration liteJobConfig;//注册中心private final CoordinatorRegistryCenter regCenter;//调度器门面private final SchedulerFacade schedulerFacade;//作业门面private final JobFacade jobFacade;private JobScheduler(final CoordinatorRegistryCenter regCenter, final LiteJobConfiguration liteJobConfig, final JobEventBus jobEventBus, final ElasticJobListener... elasticJobListeners) {JobRegistry.getInstance().addJobInstance(liteJobConfig.getJobName(), new JobInstance());this.liteJobConfig = liteJobConfig;this.regCenter = regCenter;List<ElasticJobListener> elasticJobListenerList = Arrays.asList(elasticJobListeners);setGuaranteeServiceForElasticJobListeners(regCenter, elasticJobListenerList);schedulerFacade = new SchedulerFacade(regCenter, liteJobConfig.getJobName(), elasticJobListenerList);jobFacade = new LiteJobFacade(regCenter, liteJobConfig.getJobName(), Arrays.asList(elasticJobListeners), jobEventBus);}
/*** 初始化作业.*/public void init() {JobRegistry.getInstance().setCurrentShardingTotalCount(liteJobConfig.getJobName(), liteJobConfig.getTypeConfig().getCoreConfig().getShardingTotalCount());JobScheduleController jobScheduleController = new JobScheduleController(createScheduler(), createJobDetail(liteJobConfig.getTypeConfig().getJobClass()), liteJobConfig.getJobName());JobRegistry.getInstance().registerJob(liteJobConfig.getJobName(), jobScheduleController, regCenter);schedulerFacade.registerStartUpInfo(liteJobConfig);jobScheduleController.scheduleJob(liteJobConfig.getTypeConfig().getCoreConfig().getCron());}
/*** 注册作业启动信息.** @param liteJobConfig 作业配置*/public void registerStartUpInfo(final LiteJobConfiguration liteJobConfig) {regCenter.addCacheData("/" + liteJobConfig.getJobName());// 开启所有监听器listenerManager.startAllListeners();// 选举主节点leaderService.electLeader();//持久化job的配置信息configService.persist(liteJobConfig);LiteJobConfiguration liteJobConfigFromZk = configService.load(false);// 持久化作业服务器上线信息serverService.persistOnline(!liteJobConfigFromZk.isDisabled());// 持久化作业运行实例上线相关信息,将服务实例注册到zkinstanceService.persistOnline();// 设置 需要重新分片的标记shardingService.setReshardingFlag();// 初始化 作业监听服务monitorService.listen();// 初始化 调解作业不一致状态服务if (!reconcileService.isRunning()) {reconcileService.startAsync();}}
/*** 调度作业.** @param cron CRON表达式*/public void scheduleJob(final String cron) {try {if (!scheduler.checkExists(jobDetail.getKey())) {scheduler.scheduleJob(jobDetail, createTrigger(cron));}scheduler.start();} catch (final SchedulerException ex) {throw new JobSystemException(ex);}}

private JobDetail createJobDetail(final String jobClass) {JobDetail result = JobBuilder.newJob(LiteJob.class).withIdentity(liteJobConfig.getJobName()).build();//忽略其它代码}
public final class LiteJob implements Job {@Setterprivate ElasticJob elasticJob;@Setterprivate JobFacade jobFacade;@Overridepublic void execute(final JobExecutionContext context) throws JobExecutionException {JobExecutorFactory.getJobExecutor(elasticJob, jobFacade).execute();}}
public final class JobExecutorFactory {/*** 获取作业执行器.** @param elasticJob 分布式弹性作业* @param jobFacade 作业内部服务门面服务* @return 作业执行器*/@SuppressWarnings("unchecked")public static AbstractElasticJobExecutor getJobExecutor(final ElasticJob elasticJob, final JobFacade jobFacade) {// ScriptJobif (null == elasticJob) {return new ScriptJobExecutor(jobFacade);}// SimpleJobif (elasticJob instanceof SimpleJob) {return new SimpleJobExecutor((SimpleJob) elasticJob, jobFacade);}// DataflowJobif (elasticJob instanceof DataflowJob) {return new DataflowJobExecutor((DataflowJob) elasticJob, jobFacade);}throw new JobConfigurationException("Cannot support job type '%s'", elasticJob.getClass().getCanonicalName());}}
// AbstractElasticJobExecutor.javapublic final void execute() {// 检查作业执行环境try {jobFacade.checkJobExecutionEnvironment();} catch (final JobExecutionEnvironmentException cause) {jobExceptionHandler.handleException(jobName, cause);}// 获取当前作业服务器的分片上下文ShardingContexts shardingContexts = jobFacade.getShardingContexts();// 发布作业状态追踪事件(State.TASK_STAGING)if (shardingContexts.isAllowSendJobEvent()) {jobFacade.postJobStatusTraceEvent(shardingContexts.getTaskId(), State.TASK_STAGING, String.format("Job '%s' execute begin.", jobName));}// 跳过存在运行中的被错过作业if (jobFacade.misfireIfRunning(shardingContexts.getShardingItemParameters().keySet())) {// 发布作业状态追踪事件(State.TASK_FINISHED)if (shardingContexts.isAllowSendJobEvent()) {jobFacade.postJobStatusTraceEvent(shardingContexts.getTaskId(), State.TASK_FINISHED, String.format("Previous job '%s' - shardingItems '%s' is still running, misfired job will start after previous job completed.", jobName,shardingContexts.getShardingItemParameters().keySet()));}return;}// 执行作业执行前的方法try {jobFacade.beforeJobExecuted(shardingContexts);//CHECKSTYLE:OFF} catch (final Throwable cause) {//CHECKSTYLE:ONjobExceptionHandler.handleException(jobName, cause);}// 执行普通触发的作业execute(shardingContexts, JobExecutionEvent.ExecutionSource.NORMAL_TRIGGER);// 执行被跳过触发的作业while (jobFacade.isExecuteMisfired(shardingContexts.getShardingItemParameters().keySet())) {jobFacade.clearMisfire(shardingContexts.getShardingItemParameters().keySet());execute(shardingContexts, JobExecutionEvent.ExecutionSource.MISFIRE);}// 执行作业失效转移jobFacade.failoverIfNecessary();// 执行作业执行后的方法try {jobFacade.afterJobExecuted(shardingContexts);//CHECKSTYLE:OFF} catch (final Throwable cause) {//CHECKSTYLE:ONjobExceptionHandler.handleException(jobName, cause);}}

作者简介
Xinchun OPPO高级后端工程师
目前负责分布式作业调度的研发,关注消息队列、redis数据库、ElasticSearch等中间件技术。
文章转载自OPPO数智技术,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。




