暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

Java项目笔记之分布式任务调度Elastic-job

java学途 2021-06-21
979


不点蓝字,我们哪来故事?



分布式调度Elastic-job

1.概述

1.1什么是任务调度

我们可以思考一下下面业务场景的解决方案:

  • 某电商平台需要每天上午10点,下午3点,晚上8点发放一批优惠券

  • 某银行系统需要在信用卡到期还款日的前三天进行短信提醒

  • 某财务系统需要在每天凌晨0:10分结算前一天的财务数据,统计汇总

以上场景就是任务调度所需要解决的问题

任务调度是为了自动完成特定任务,在约定的特定时刻去执行任务的过程;可以就理解成定时任务(以前做Redis的持久化)。

我们在之前骡窝窝项目的学习中,使用过Spring中提供的定时任务注解@Scheduled

在业务类中方法中贴上这个注解

     @Scheduled(cron = "0/20 * * * * ? ")
    public void doWork(){
    //doSomething
    }

    然后在启动类上贴上@EnableScheduling
    注解,但是它仅仅能够适应单体应用的任务调度。

    1.2 为什么需要分布式调度

    感觉Spring给我们提供的这个注解可以完成任务调度的功能,好像已经完美解决问题了,为什么还需要分布式呢?

    主要有如下这几点原因:

    1.单机处理极限:原本1分钟内需要处理1万个订单,但是现在需要1分钟内处理10万个订单;原来一个统计需要1小时,现在业务方需要10分钟就统计出来。你也许会说,你也可以多线程、单机多进程处理。的确,多线程并行处理可以提高单位时间的处理效率,但是单机能力毕竟有限(主要是CPU、内存和磁盘),始终会有单机处理不过来的情况。

    2.高可用:单机版的定式任务调度只能在一台机器上运行,如果程序或者系统出现异常就会导致功能不可用。虽然可以在单机程序实现的足够稳定,但始终有机会遇到非程序引起的故障,而这个对于一个系统的核心功能来说是不可接受的。

    3.防止重复执行: 在单机模式下,定时任务是没什么问题的。但当我们部署了多台服务,同时又每台服务又有定时任务时,若不进行合理的控制在同一时间,只有一个定时任务启动执行,这时,定时执行的结果就可能存在混乱和错误了

    这个时候就需要分布式的任务调度来实现了。

    1.3 Elastic-Job介绍

    Elastic-Job是一个分布式调度的解决方案,由当当网开源,它由两个相互独立的子项目Elastic-job-Lite和Elastic-Job-Cloud组成,使用Elastic-Job可以快速实现分布式任务调度。

    Elastic-Job的github地址:https://github.com/elasticjob

    功能列表:

    • 分布式调度协调

      在分布式环境中,任务能够按照指定的调度策略执行,并且能够避免同一任务多实例重复执行。

    • 丰富的调度策略:

      基于成熟的定时任务作业框架Quartz cron表达式执行定时任务。

    • 弹性拓容缩容

      当集群中增加一个实例,它应当能够被选举被执行任务;当集群减少一个实例时,他所执行的任务能被转移到别的示例中执行。

    • 失效转移

      某示例在任务执行失败后,会被转移到其他实例执行。

    • 错过执行任务重触发

      若因某种原因导致作业错过执行,自动记录错误执行的作业,并在下次次作业完成后自动触发。

    • 支持并行调度

      支持任务分片,任务分片是指将一个任务分成多个小任务在多个实例同时执行。

    • 作业分片一致性

      当任务被分片后,保证同一分片在分布式环境中仅一个执行实例。

    • 支持作业生命周期操作

      可以动态对任务进行开启及停止操作。

    • 丰富的作业类型

      支持Simple、DataFlow、Script三种作业类型

    系统架构图


    2.Elastic-Job快速入门

    2.1 环境搭建

    2.1.1 版本要求

    • JDK 要求1.7以上版本

    • Maven 要求3.0.4及以上版本

    • Zookeeper 要求采取3.4.6以上版本

    2.1.2 Zookeeper安装&运行

    2.1.3 创建Maven项目

    添加如下依赖

       <dependency>
      <groupId>com.dangdang</groupId>
      <artifactId>elastic-job-lite-core</artifactId>
      <version>2.1.5</version>
      </dependency>

      2.2 代码实现

      2.2.1 任务类

         /**
        * 自己的任务类
        */
        public class MyElasticJob implements SimpleJob {
        //会根据 cron 表达式定义的时间来执行这个方法;shardingContext分片才会用到
        public void execute(ShardingContext shardingContext) {
        System.out.println("定时任务开始执行————>" + new Date());
        }
        }

        2.2.2 配置类

           public class JobDemo {
          public static void main(String[] args) {
          //创建 JobScheduler ,然后 init 方法初始化
          //参数1:注册中心zookeeper配置;参数2:任务相关的配置
          new JobScheduler(createRegistryCenter(), createJobConfiguration()).init();
          }

          //启动作业配置
          private static CoordinatorRegistryCenter createRegistryCenter() {
          //参数1:配置zookeeper的地址,调度任务的组名;参数2:项目名即可
          ZookeeperConfiguration zookeeperConfiguration = new ZookeeperConfiguration("localhost:2181", "elastic-job-demo");
          //设置超时时间(一定要配置)
          zookeeperConfiguration.setSessionTimeoutMilliseconds(100);
          //创建注册中心配置
          CoordinatorRegistryCenter regCenter = new ZookeeperRegistryCenter(zookeeperConfiguration);
          //初始化
          regCenter.init();
          return regCenter;
          }

          //作业配置
          private static LiteJobConfiguration createJobConfiguration() {
          // 定义作业核心配置
          //参数1:作业名称;参数2:corn 表达式;参数3:分片数
          JobCoreConfiguration simpleCoreConfig = JobCoreConfiguration.newBuilder("demoSimpleJob", "0/3 * * * * ?", 1).build();
          // 定义SIMPLE类型配置
          //参数1:作业配置;参数2:任务类的全限定名;
          SimpleJobConfiguration simpleJobConfig = new SimpleJobConfiguration(simpleCoreConfig, MyElasticJob.class.getCanonicalName());
          // 定义Lite作业根配置
          LiteJobConfiguration simpleJobRootConfig = LiteJobConfiguration.newBuilder(simpleJobConfig).build();
          return simpleJobRootConfig;
          }
          }
          需要运行zookeeper服务:
          1. 解压文件:zookeeper-3.4.11.tar.gz

          2. 进入conf目录下,拷贝zoo_sample.cfg
            文件,并将文件名改成zoo.cfg
            ,用于加载自定义配置的,这里不需要做任何修改;

          3. bin目录下双击zkServer.cmd
            文件启动zookeeper服务;

          4. 双击zkCli.cmd
            启动窗口界面;ls 可以看到节点;不方便,选择使用图形化界面;

          图形化界面:

          1. 解压文件ZooInspector.zip

          2. 进入builder中运行jar包:zookeeper-dev-ZooInspector.jar

          3. 启动后点击连接,不做修改,OK即可。

          2.2.3 测试

          • 运行单个程序,查看是否按照cron表达式的内容进行任务的调度

          • 运行多个程序(集群),查看是否只会有一个实例进行任务调度

          • 运行多个程序后,把正在进行任务调度的进程关掉,查看其它进程是否能继续进行任务调度

            这样就表示了elastic-job的高可用



          3.SpringBoot集成Elastic-Job

          3.1 添加Maven依赖

                 <groupId>cn.wolfcode</groupId>
            <artifactId>elastic-job-springboot</artifactId>
            <version>1.0.0</version>
            <parent>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-parent</artifactId>
            <version>2.1.3.RELEASE</version>
            </parent>
            <properties>
            <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
            <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
            <java.version>1.8</java.version>
            </properties>
            <dependencies>
            <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
            </dependency>
            <dependency>
            <groupId>com.dangdang</groupId>
            <artifactId>elastic-job-lite-spring</artifactId>
            <version>2.1.5</version>
            </dependency>
            <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            </dependency>
            </dependencies>

            elastic-job-lite-spring
            和SpringBoot整合需要导如的依赖;在elastic-job
            中没有提供想starter之类的依赖,所以需要我们自己把bean通过注解的方式创建出来;可以自定义stater,自己封装也比较简单??(我没有思路)


            3.2 相关配置

            因为配置中心的地址并不是固定的,所以我们应该把这个地址信息配置在配置文件中,所以在配置文件application.yml中添加配置如下:

               #配置中心的地址和命名空间按,在配置文件管理,方法中获取
              elastic-job:
              zookeeper-url: localhost:2181
              group-name: elastic-job-group

              zk注册中心配置类:

                 /**
                * zookeeper 注册中心配置类
                */
                @Configuration
                public class RegistryCenterConfig {

                ////启动作业配置
                @Bean(initMethod = "init") //交给spring完成bean的创建,创建完成之后调用初始化的方法
                public CoordinatorRegistryCenter createRegistryCenter(
                @Value("${elastic-job.zookeeper-url}") String zookeeperUrl, @Value("${elastic-job.group-name}") String groupName) {
                //参数1:配置zookeeper的地址,调度任务的组名;参数2:项目名即可
                ZookeeperConfiguration zookeeperConfiguration =
                new ZookeeperConfiguration(zookeeperUrl, groupName);
                //设置超时时间(一定要配置)
                zookeeperConfiguration.setSessionTimeoutMilliseconds(100);//其实这个100也可以配到配置文件中
                //创建注册中心配置
                CoordinatorRegistryCenter regCenter = new ZookeeperRegistryCenter(zookeeperConfiguration);
                //初始化
                //regCenter.init(); 在 @Bean 中配置了调用初始化的方法,所以这里不需要了
                return regCenter;
                }

                }


                自己的任务类:交给spring管理

                   /**
                  * 自己的任务类
                  */
                  @Component //需要交给spring来进行管理
                  public class MyElasticJob implements SimpleJob {
                  //会根据 cron 表达式定义的时间来执行这个方法;shardingContext分片才会用到
                  public void execute(ShardingContext shardingContext) {
                  System.out.println("定时任务开始执行————>" + new Date());
                  }
                  }


                  任务调度配置类:

                    /**
                    * 任务调度配置类
                    */
                    @Configuration
                    public class ElasticJobConfig {
                    @Autowired //注入下文参数2
                    private CoordinatorRegistryCenter registryCenter;


                    //返回作业配置的方法 参数3 需要
                    //同样的 newBuilder的参数不能写死,用方法来传递参数来解决,所以定义了四个形参
                    private LiteJobConfiguration createJobConfig(Class<? extends ElasticJob> clazz, //任务类的字节码
                    String cron, //cron表达式
                    int shrdingTotalCount, //分片个数
                    String shardingParamter //分片参数
                    ) {
                    JobCoreConfiguration.Builder builder = JobCoreConfiguration.newBuilder(clazz.getSimpleName(), cron, shrdingTotalCount);
                    if (!StringUtils.isEmpty(shardingParamter)) {
                    //如果传了分片参数,我们需要设置进去
                    builder.shardingItemParameters(shardingParamter);
                    }


                    // 定义作业核心配置
                    //参数1:作业名称;参数2:corn 表达式;参数3:分片数
                    //JobCoreConfiguration simpleCoreConfig = JobCoreConfiguration.newBuilder("demoSimpleJob", "0/3 * * * * ?", 1).build(); 不写死
                    JobCoreConfiguration simpleCoreConfig = builder.build(); //把builder提起前的原因是要设置分片的次数
                    // 定义SIMPLE类型配置
                    //参数1:作业配置;参数2:任务类的全限定名;
                    SimpleJobConfiguration simpleJobConfig = new SimpleJobConfiguration(simpleCoreConfig, clazz.getCanonicalName());
                    // 定义Lite作业根配置
                    //overwrite(true):如果没有这个配置,分片参数的值将始终是第一次设置的值,不会被覆盖,代码修改没有效果,只能删除配置重新设置;加上即可覆盖
                    LiteJobConfiguration simpleJobRootConfig = LiteJobConfiguration.newBuilder(simpleJobConfig).overwrite(true).build();
                    return simpleJobRootConfig;
                    }


                    @Bean(initMethod = "init") // 需要将MyElasticJob对象给注入进来,这里给MyElasticJob创建对象并做初始化
                    public SpringJobScheduler initMyElasticJobBean(MyElasticJob myElasticJob) {
                    //需要的参数:
                    // 参数1ElasticJob类型的参数:myElasticJob是间接继承自ElasticJob的,
                    // 参数2:需要注入RegistryCenterConfig的对象
                    // 参数3LiteJobConfiguration:内容比较多,封装成一个方法createJobConfig,
                    return new SpringJobScheduler(myElasticJob, registryCenter,
                    createJobConfig(myElasticJob.getClass(),"0/3 * * * * ?", 1, null));
                    }
                    }


                    测试:

                    如何开启多个服务呢?


                    效果和快速入门是一样的,两个服务器只有一个在执行分时任务,当执行的服务器挂掉之后,会选举出新的服务器做分时任务。



                    4.案例需求

                    需求:数据库中有一些列的数据,需要对这些数据进行备份操作,备份完之后,修改数据的状态,标记已经备份了。

                    4.1 初始化数据

                    在数据库中导入elastic-job-demo.sql
                    数据;

                    4.2 集成Druid&MyBatis

                    4.2.1 添加依赖

                              <dependency>
                      <groupId>com.alibaba</groupId>
                      <artifactId>druid</artifactId>
                      <version>1.1.10</version>
                      </dependency>
                      <dependency>
                      <groupId>org.mybatis.spring.boot</groupId>
                      <artifactId>mybatis-spring-boot-starter</artifactId>
                      <version>1.2.0</version>
                      </dependency>
                      <!--mysql驱动-->
                      <dependency>
                      <groupId>mysql</groupId>
                      <artifactId>mysql-connector-java</artifactId>
                      </dependency>

                      4.2.2 添加配置

                        spring:
                        datasource:
                        url: jdbc:mysql://localhost:3306/elastic-job-demo?serverTimezone=GMT%2B8
                        driverClassName: com.mysql.jdbc.Driver
                        type: com.alibaba.druid.pool.DruidDataSource
                        username: root
                        password: root

                        4.2.3 添加实体类

                          @Data
                          public class FileCustom {
                          //唯一标识
                          private Long id;
                          //文件名
                          private String name;
                          //文件类型
                          private String type;
                          //文件内容
                          private String content;
                          //是否已备份
                          private Boolean backedUp = false;
                          public FileCustom(){}
                          public FileCustom(Long id, String name, String type, String content){
                          this.id = id;
                          this.name = name;
                          this.type = type;
                          this.content = content;
                          }
                          }

                          4.2.4 添加Mapper处理类

                          使用注解的方式操作MyBatis执行SQL语句:

                            /**
                            * 连接MySQL数据库的mapper接口
                            */
                            @Mapper
                            public interface FileCustomMapper {
                            //执行SQL的方式有两种:mapper.xml 和注解的方式,这里使用注解的方式执行SQL语句


                            @Select("SELECT * FROM t_file_custom WHERE backedUp = 0")
                            List<FileCustom> selectAll();
                            @Update("UPDATE t_file_custom SET backedUp = #{state} WHERE id = #{id}")
                            int changeState(@Param("id") Long id,@Param("state") int state);
                            }


                            4.3 业务功能实现

                            4.3.1 添加任务类

                              /**
                              * 任务类:数据库中有一些列的数据,需要对这些数据进行备份操作,备份完之后,修改数据的状态,标记已经备份了。
                              */
                              @Component
                              public class FileCustomJob implements SimpleJob{
                              @Autowired
                              private FileCustomMapper fileCustomMapper;
                              @Override
                              public void execute(ShardingContext shardingContext) {
                              doWork();
                              }


                              private void doWork(){
                              List<FileCustom> fileList = fileCustomMapper.selectAll();
                              System.out.println("此次需要备份文件个数:"+fileList.size());
                              for(FileCustom fileCustom:fileList){
                              backUpFile(fileCustom);
                              }
                              }
                              private void backUpFile(FileCustom fileCustom){
                              try {
                              //模拟备份动作,假设一堆操作花一秒钟
                              TimeUnit.SECONDS.sleep(1);
                              } catch (InterruptedException e) {
                              e.printStackTrace();
                              }
                              System.out.println("执行文件备份====> id:"+fileCustom.getId()+" 类型:"+fileCustom.getType());
                              fileCustomMapper.changeState(fileCustom.getId(),1);
                              }
                              }

                              4.3.2 添加任务调度配置

                              在配置类中新增这个Bean,将任务类加入配置当中生效。

                                @Bean(initMethod = "init")
                                public SpringJobScheduler initFileCustomElasticJob(FileCustomElasticJob fileCustomElasticJob){
                                SpringJobScheduler springJobScheduler = new SpringJobScheduler(fileCustomElasticJob,registryCenter,createJobConfiguration(FileCustomElasticJob.class,"0 0/1 * * * ?",1,null));
                                return springJobScheduler;
                                }

                                4.4 测试&问题

                                为了高可用,我们会对这个项目做集群的操作,可以保证其中一台挂了,另外一台可以继续工作.但是在集群的情况下,调度任务只在一台机器上运行,如果单个任务调度比较耗时,耗资源的情况下,对这台机器的消耗还是比较大的,

                                但是这个时候,其他机器却是空闲着的。如何合理的利用集群的其他机器且如何让任务执行得更快些呢?这时候Elastic-Job提供了任务调度分片的功能。


                                5.分片概念

                                作业分片是指任务的分布式执行,需要将一个任务拆分为多个独立的任务项,然后由分布式的应用实例分别执行某一个或者几个分布项。

                                例如:Elastic-Job快速入门中文件备份的案例,现有两台服务器,每台服务器分别跑一个应用实例。为了快速执行作业,那么可以讲任务分成4片,每个应用实例都执行两片。作业遍历数据逻辑应为:实例1查找text和image类型文件执行备份,实例2查找radio和vedio类型文件执行备份。如果由于服务器拓容应用实例数量增加为4,则作业遍历数据的逻辑应为: 4个实例分别处理text,image,radio,video类型的文件。

                                可以看到,通过对任务的合理分片化,从而达到任务并行处理的效果.

                                分片项与业务处理解耦

                                Elastic-Job并不直接提供数据处理的功能,框架只会将分片项分配至各个运行中的作业服务器,开发者需要自行处理分片项与真实数据的对应关系

                                最大限度利用资源

                                将分片项设置大于服务器的数据,最好是大于服务器倍数的数量,作业将会合理利用分布式资源,动态的分配分片项.

                                例如: 3台服务器,分成10片,则分片项结果为服务器A=0,1,2;服务器B=3,4,5;服务器C=6,7,8,9.如果   服务器C奔溃,则分片项分配结果为服务器A=0,1,2,3,4;服务器B=5,6,7,8,9.在不丢失分片项的情况下,最大限度利用现有的资源提高吞吐量。

                                在没有分片的情况下,任务仅仅在一台服务器上执行的,并且是串行执行,所有数据逐条执行,完成整个任务还是很耗时,对执行的服务器压力还是很大。我们可以将数据按照业务逻辑(type)进行分片;

                                分成四片:0:text、1:image、2:radio、3:video  每种序号的分片分别处理四种类型的数据;


                                6.案例改造成任务分片

                                6.1 配置类修改

                                在任务配置类中增加分片个数以及分片参数.

                                  @Bean(initMethod = "init")
                                  public SpringJobScheduler initFileCustomElasticJob(FileCustomElasticJob fileCustomElasticJob){
                                  SpringJobScheduler springJobScheduler = new SpringJobScheduler(
                                  fileCustomElasticJob,
                                  registryCenter,
                                  createJobConfiguration(FileCustomElasticJob.class,"0 0/1 * * * ?",4,"0=text,1=image,2=radio,3=vedio"));
                                  return springJobScheduler;
                                  }

                                  6.2 新增作业分片逻辑

                                    @Component
                                    public class FileCustomJob implements SimpleJob {
                                    @Autowired
                                    private FileCustomMapper fileCustomMapper;


                                    @Override
                                    public void execute(ShardingContext shardingContext) {
                                    System.out.println("ShardingParameter:" + shardingContext.getShardingParameter());
                                    System.out.println("ShardingItem:" + shardingContext.getShardingItem());
                                    doWork(shardingContext.getShardingParameter());
                                    }


                                    private void doWork(String shardingParameter) {
                                    List<FileCustom> fileList = fileCustomMapper.selecByType(shardingParameter);
                                    System.out.println("线程id :" + Thread.currentThread().getId());
                                    System.out.println("分片的类型为:" + shardingParameter + "此次需要备份文件个数:" + fileList.size());
                                    for (FileCustom fileCustom : fileList) {
                                    backUpFile(fileCustom);
                                    }
                                    }


                                    private void backUpFile(FileCustom fileCustom) {
                                    try {
                                    //模拟备份动作,假设一堆操作花一秒钟
                                    TimeUnit.SECONDS.sleep(1);
                                    } catch (InterruptedException e) {
                                    e.printStackTrace();
                                    }
                                    System.out.println("执行文件备份====> id:" + fileCustom.getId() + " 类型:" + fileCustom.getType());
                                    fileCustomMapper.changeState(fileCustom.getId(), 1);
                                    }
                                    }

                                    6.3 Mapper类修改

                                      @Mapper
                                      public interface FileCustomMapper {
                                      @Select("select * from t_file_custom where backedUp = 0")
                                      List<FileCustom> selectAll();
                                      @Select("select * from t_file_custom where backedUp = 0 and type=#{fileType}")
                                      List<FileCustom> selecByType(String fileType);
                                      @Update("update t_file_custom set backedUp = #{state} where id = #{id}")
                                      int changeState(@Param("id") Long id, @Param("state")int state);
                                      }

                                      6.4 测试

                                      • 只有一台机器的情况下,任务分片是如何执行的:4个线程并行执行;

                                      • 有多台机器的情况下,任务分片是如何执行的:两台服务器,每个服务器两条线程并行执行;



                                      7.Dataflow类型调度任务

                                      Dataflow类型的定时任务需要实现Dataflowjob接口,该接口提供2个方法供覆盖,分别用于抓取(fetchData)和处理(processData)数据,我们继续对例子进行改造。

                                      Dataflow类型用于处理数据流,他和SimpleJob不同,它以数据流的方式执行,调用fetchData抓取数据,知道抓取不到数据才停止作业。

                                      SimpleJob类型是一次性的取出,然后执行;

                                      Dataflow每次取,直到取不到为止;

                                      应用场景

                                      现在定时任务需要处理100w数据;单条数据对象比较大,一次性查询出来的话,到时内存溢出。所以选择Dataflow类型的定时任务;Dataflow一次取10w,10w做完之后再去下一个10w的数据。

                                      7.1 任务类

                                        @Component
                                        public class FileDataflowJob implements DataflowJob<FileCustom> {
                                        @Autowired
                                        private FileCustomMapper fileCustomMapper;
                                        @Override
                                        public List<FileCustom> fetchData(ShardingContext shardingContext) {
                                        List<FileCustom> fileCustoms = fileCustomMapper.fetchData(2);
                                        System.out.println("抓取时间:"+new Date()+",个数"+fileCustoms.size());
                                        return fileCustoms;
                                        }
                                        @Override
                                        public void processData(ShardingContext shardingContext, List<FileCustom> data) {
                                        for(FileCustom fileCustom:data){
                                        backUpFile(fileCustom);
                                        }
                                        }
                                        private void backUpFile(FileCustom fileCustom){
                                        try {
                                        //模拟备份动作
                                        TimeUnit.SECONDS.sleep(1);
                                        } catch (InterruptedException e) {
                                        e.printStackTrace();
                                        }
                                        System.out.println("执行文件备份====>"+fileCustom);
                                        fileCustomMapper.changeState(fileCustom.getId(),1);
                                        }
                                        }

                                        Mapper中的fetchData方法:

                                            @Select("SELECT * FROM t_file_custom WHERE backedUp = 0 limit 0,#{size}")
                                          List<FileCustom> fetchData(int size);


                                          7.2 配置类


                                            /**
                                            * 任务调度配置类
                                            */
                                            @Configuration
                                            public class ElasticJobConfig {
                                            @Autowired //注入下文参数2
                                            private CoordinatorRegistryCenter registryCenter;


                                            //返回作业配置的方法 参数3 需要
                                            //同样的 newBuilder的参数不能写死,用方法来传递参数来解决,所以定义了四个形参
                                            private LiteJobConfiguration createJobConfig(Class<? extends ElasticJob> clazz, //任务类的字节码
                                            String cron, //cron表达式
                                            int shrdingTotalCount, //分片个数
                                            String shardingParamter, //分片参数
                                            Boolean isDataFlowType //是否是数据流DataflowJob类型的任务
                                            ) {
                                            JobCoreConfiguration.Builder builder = JobCoreConfiguration.newBuilder(clazz.getSimpleName(), cron, shrdingTotalCount);
                                            if (!StringUtils.isEmpty(shardingParamter)) {
                                            //如果传了分片参数,我们需要设置进去
                                            builder.shardingItemParameters(shardingParamter);
                                            }


                                            //数据流DataflowJob类型的任务需要用到 JobTypeConfiguration 做配置定义
                                            JobTypeConfiguration jobTypeConfiguration = null;


                                            //根据不同的;任务累心设置不同的配置对象
                                            if (isDataFlowType){
                                            //是DataflowJob数据流类型的任务
                                            // 定义DataflowJob类型配置
                                            jobTypeConfiguration = new DataflowJobConfiguration(builder.build(), clazz.getCanonicalName(),true);
                                            }else {
                                            //不是DataflowJob数据流类型的任务
                                            // 定义SIMPLE类型配置
                                            jobTypeConfiguration = new SimpleJobConfiguration(builder.build(), clazz.getCanonicalName());
                                            }


                                            // 定义Lite作业根配置
                                            //overwrite(true):如果没有这个配置,分片参数的值将始终是第一次设置的值,不会被覆盖,代码修改没有效果,只能删除配置重新设置;加上即可覆盖
                                            LiteJobConfiguration simpleJobRootConfig = LiteJobConfiguration.newBuilder(jobTypeConfiguration).overwrite(true).build();
                                            return simpleJobRootConfig;
                                            }
                                            /*


                                            返回作业配置的方法 参数3 需要
                                            同样的 newBuilder的参数不能写死,用方法来传递参数来解决,所以定义了四个形参
                                            private LiteJobConfiguration createJobConfig(Class<? extends ElasticJob> clazz, 任务类的字节码
                                            String cron, cron表达式
                                            int shrdingTotalCount, 分片个数
                                            String shardingParamter 分片参数
                                            ) {
                                            JobCoreConfiguration.Builder builder = JobCoreConfiguration.newBuilder(clazz.getSimpleName(), cron, shrdingTotalCount);
                                            if (!StringUtils.isEmpty(shardingParamter)) {
                                            //如果传了分片参数,我们需要设置进去
                                            builder.shardingItemParameters(shardingParamter);
                                            }


                                            // 定义作业核心配置
                                            //参数1:作业名称;参数2:corn 表达式;参数3:分片数
                                            //JobCoreConfiguration simpleCoreConfig = JobCoreConfiguration.newBuilder("demoSimpleJob", "0/3 * * * * ?", 1).build(); //不写死
                                            JobCoreConfiguration simpleCoreConfig = builder.build(); //把builder提起前的原因是要设置分片的次数
                                            // 定义SIMPLE类型配置
                                            //参数1:作业配置;参数2:任务类的全限定名;
                                            SimpleJobConfiguration simpleJobConfig = new SimpleJobConfiguration(simpleCoreConfig, clazz.getCanonicalName());
                                            // 定义Lite作业根配置
                                            //overwrite(true):如果没有这个配置,分片参数的值将始终是第一次设置的值,不会被覆盖,代码修改没有效果,只能删除配置重新设置;加上即可覆盖
                                            LiteJobConfiguration simpleJobRootConfig = LiteJobConfiguration.newBuilder(simpleJobConfig).overwrite(true).build();
                                            return simpleJobRootConfig;
                                            }
                                            */






                                            /*


                                            @Bean(initMethod = "init") // 需要将MyElasticJob对象给注入进来,这里给MyElasticJob创建对象并做初始化
                                            public SpringJobScheduler initMyElasticJobBean(MyElasticJob myElasticJob) {
                                            //需要的参数:
                                            // 参数1ElasticJob类型的参数:myElasticJob是间接继承自ElasticJob的,
                                            // 参数2:需要注入RegistryCenterConfig的对象
                                            // 参数3LiteJobConfiguration:内容比较多,封装成一个方法createJobConfig,
                                            return new SpringJobScheduler(myElasticJob, registryCenter,
                                            createJobConfig(myElasticJob.getClass(),"0/3 * * * * ?", 1, null));
                                            }
                                            */
                                            /*


                                            @Bean(initMethod = "init") //一分钟去执行
                                            public SpringJobScheduler initMyElasticJobBean(FileCustomJob fileCustomJob) {
                                            return new SpringJobScheduler(fileCustomJob, registryCenter,
                                            //createJobConfig(fileCustomJob.getClass(),"0 0/1 * * * ?", 1, null)); //不分片的情况
                                            createJobConfig(fileCustomJob.getClass(),"0 0/1 * * * ?", 4, "0=text,1=image,2=radio,3=vedio"));
                                            }
                                            */


                                            @Bean(initMethod = "init") //一分钟去执行一次
                                            public SpringJobScheduler initFileDataflowJobBean(FileDataflowJob fileDataflowJob) {
                                            return new SpringJobScheduler(fileDataflowJob, registryCenter,
                                            createJobConfig(fileDataflowJob.getClass(),"0 0/1 * * * ?", 1, null,true)); //不分片的情况
                                            }
                                            }

                                            7.3 测试



                                            8.运维管理

                                            8.1 事件追踪

                                            Elastic-Job-Lite在配置中提供了JobEventConfiguration,支持数据库方式配置,会在数据库中自动创建JOB_EXECUTION_LOG和JOB_STATUS_TRACE_LOG两张表以及若干索引来近路作业的相关信息。

                                            8.1.1 修改Elastic-Job配置类

                                            在ElasticJobConfig配置类中注入DataSource

                                              @Configuration
                                              public class ElasticJobConfig {
                                              @Autowired //注入DataSource
                                              private DataSource dataSource;
                                              ......
                                              }

                                              在任务配置中增加事件追踪配置

                                                @Bean(initMethod = "init")
                                                public SpringJobScheduler initFileCustomElasticJob(FileCustomElasticJob fileCustomElasticJob){
                                                //增加任务事件追踪配置,即可做事件追踪
                                                JobEventConfiguration jobEventConfiguration = new JobEventRdbConfiguration(dataSource);
                                                SpringJobScheduler springJobScheduler = new SpringJobScheduler(
                                                fileCustomElasticJob,
                                                registryCenter,
                                                createJobConfiguration(FileCustomElasticJob.class,"0 0/1 * * * ?",4,"0=text,1=image,2=radio,3=vedio",false),
                                                jobEventConfiguration);
                                                return springJobScheduler;
                                                }

                                                8.1.2 日志信息表

                                                启动后会发现在elastic-job-demo数据库中新增以下两张表

                                                job_execution_log

                                                记录每次作业的执行历史,分为两个步骤:

                                                1.作业开始执行时间想数据库插入数据.

                                                2.作业完成执行时向数据库更新数据,更新is_success,complete_time和failure_cause(如果任务执行失败)


                                                job_status_trace_log

                                                记录作业状态变更痕迹表,可通过每次作业运行的task_id查询作业状态变化的生命轨迹和运行轨迹.

                                                8.2 运维控制台

                                                elastic-job中提供了一个elastic-job-lite-console控制台

                                                设计理念

                                                1.本 控制台和Elastic-Job并无直接关系,是通过读取Elastic-Job的注册中心数据展示作业状态,或更新注册中心数据修改全局配置。

                                                2.控制台只能控制任务本身是否运行,但不能控制作业进程的启停,因为控制台和作业本身服务器是完全分布式的,控制台并不能控制作业服务器。

                                                主要功能:

                                                1.查看作业以及服务器状态

                                                2.快捷的修改以及删除作业配置

                                                3.启用和禁用作业

                                                4.跨注册中心查看作业

                                                5.查看作业运行轨迹和运行状态

                                                不支持项

                                                1.添加作业,因为作业都是在首次运行时自动添加,使用控制台添加作业并无必要.直接在作业服务器启动包含Elasitc-Job的作业进程即可。

                                                8.2.1 搭建步骤

                                                • 解压缩elastic-job-lite-console-2.1.5.tar

                                                • 进入bin目录,并执行:

                                                    bin\start.bat


                                                  • 打开浏览器访问http://localhost:8899

                                                    用户名: root 密码: root,进入之后界面如下:

                                                  提供两种用户:管理员和访客,管理员拥有全部操作权限,访客仅拥有查看权限。默认管理员账号和面膜是root/root,访客用户名和密码是guest/guest,通过conf\auth.properties可以修改管理员以及访客用户名及密码

                                                  8.2.2 配置及使用

                                                  • 配置注册中心地址

                                                    先启动zookeeper然后再注册中心配置界面,点添加

                                                  点击提交后,然后点连接(zookeeper必须处于启动状态)


                                                  连接成功后,在作业纬度下可以显示该命名空间作业名称,分片数量及该作业的cron表达式等信息

                                                  在服务器纬度可以查看到服务器ip,当前运行的是实例数,作业总数等信息。

                                                  添加数据库连接之后可以查看任务的执行结果

                                                  然后在作业历史中就可以看到任务执行历史了。



                                                  补充

                                                  管控台的数据其实都是从zookeeper
                                                  和数据库获取出来的。





                                                  java学途

                                                  只分享有用的Java技术资料 

                                                  扫描二维码关注公众号

                                                   


                                                  笔记|学习资料|面试笔试题|经验分享 

                                                  如有任何需求或问题欢迎骚扰。微信号:JL2020aini

                                                  或扫描下方二维码添加小编微信

                                                   




                                                  小伙砸,欢迎再看分享给其他小伙伴!共同进步!


                                                  文章转载自java学途,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

                                                  评论