暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

DolphinScheduler自身容错导致的服务器持续崩溃重大问题的排查与解决

海豚调度 2025-01-10
234

01

问题复现


在DolphinScheduler中有如下一个Shell任务:
    current_timestamp() {  
        date +"%Y-%m-%d %H:%M:%S"  
    }


    TIMESTAMP=$(current_timestamp)
    echo $TIMESTAMP
    sleep 60

    在DolphinScheduler将工作流执行策略设置为并行:


    定时周期调度设置为10秒一次:
    将定时调度上线后,会调度执行任务,此时一切正常:
    此时将Master节点给kill掉,模拟宕机:
      $ jps
      1979710 AlertServer
      1979626 WorkerServer
      1979546 MasterServer
      1979794 ApiApplicationServer
      1980483 Jps
      $ kill -9 1979546
      去到DolphinScheduler中查看,发现Master已经不存在了:
      此时观察DolphinScheduler工作流执行,发现其不会继续调度任务执行了,并且所有的任务则会一直执行下去,直到报错。
      当过了一段时间后(模拟发现了宕机问题),此时重启DolphinScheduler:
        sh bin/stop-all.sh
        sh bin/start-all.sh
        重启完成后,就会将之前没有执行成功的任务,包括没有执行的调度任务,全部都执行一次:
        这就有一个致命的问题:如果都是高性能任务的话,就会导致CPU、内存被打满,从而让服务器整个宕机!!!


        02

        多场景测试

        • Master宕机后,重启整个DS:会产生上述问题。

        • Master宕机后,重启相应的Master:会产生上述问题。——有缺陷,官方没有单独的Master后台启动,只有前台启动的脚本,但可以重复执行start-all.sh。
        • Worker宕机后,重启整个DS:不会产生上述问题。——因为Master会持续的调度任务,而Worker宕机后的结果就是调度任务直接失败。
        • Worker宕机后,重启相应的Worker:不会产生上述问题。——有缺陷,官方没有单独的Worker后台启动,只有前台启动的脚本,但可以重复执行start-all.sh。
        • DS整个宕机后,重启整个DS:会产生上述问题。
        • DS使用stop-all.sh停止后,重启整个DS:会产生上述问题。

        其核心就是在于Master,只要配置了周期任务,无论Master是宕机还是调用脚本关闭的,其都会产生上述问题。


        03

        原理分析

        DolphinScheduler核心角色
        • MasterServer主要负责 DAG 任务切分、任务提交监控,并同时监听其它MasterServer和WorkerServer的健康状态。MasterServer服务启动时向Zookeeper注册临时节点,通过监听Zookeeper临时节点变化来进行容错处理。
        • WorkerServer主要负责任务的执行和提供日志服务。WorkerServer服务启动时向Zookeeper注册临时节点,并维持心跳。
        • ApiServer主要负责处理前端UI层的请求。

        大致的任务运行流程如下:
        • 在API-Server中创建任务,并将元数据持久化到DB中。
        • 通过手动点击或定时执行生成一个触发工作流执行的Command写入DB。
        • Master消费DB中的Command,开始执行工作流,并将工作流中的任务分发给Worker执行。
        • 当整个工作流执行结束之后,Master结束工作流的执行。
        参考官网,上述的DolphinScheduler核心任务执行流程可以细化为如下:
        鉴于任务调度的复杂性,一个大的流程可以划分为小的流程,在主线流程之外还附加了支线流程,下面对执行调度流程拆分进行分析一下,这样更容易理解:
        在本次问题中,主要关注的就是Command分发流程。其Command分发流程是一个异步分布式生产消费模式。
            i.首先是生产者api-server,会将用户的运行工作流http请求封装成command数据,insert到t_ds_command表中,如下是一个启动工作流实例的command样例(老版本):
          {
              "commandType": "START_PROCESS",
              "processDefinitionCode": 14285512555584,
              "executorId": 1,
              "commandParam": "{}",
              "taskDependType": "TASK_POST",
              "failureStrategy": "CONTINUE",
              "warningType": "NONE",
              "startTime": 1723444881372,
              "processInstancePriority": "MEDIUM",
              "updateTime": 1723444881372,
              "workerGroup": "default",
              "tenantCode": "default",
              "environmentCode": -1,
              "dryRun": 0,
              "processInstanceId": 0,
              "processDefinitionVersion": 1,
              "testFlag": 0
          }
              ii.其次是消费者,master server中的MasterSchedulerBootstrap loop程序, MasterSchedulerBootstrap使用ZK分配到自己的slot,从t_ds_command表中select属于slot的command列表处理,其查询语句是:
            <select id="queryCommandPageBySlot" resultType="org.apache.dolphinscheduler.dao.entity.Command">
                    select *
                    from t_ds_command
                    where id % #{masterCount} = #{thisMasterSlot}
                    order by process_instance_priority, id asc
                        limit #{limit}
            </select>
                iii.MasterSchedulerBootstrap loop轮训查到待处理的command任务,将command任务和master host生成ProcessInstance,将ProcessInstance对象插入到t_ds_process_instance表中, 同时生成包含运行所需要的上下文信息的可执行任务workflowExecuteRunnable。将workflowExecuteRunnablecache到本地cache processInstanceExecCacheManager,同时生产将ProcessInstance的WorkflowEventType.START_WORKFLOW生产到workflowEventQueue队列中。




            上面的步骤是用户在Web页面点击启动任务后的流程,而本次的问题是Master周期调度的问题。经过查阅资料,周期调度任务则是MasterServer将其封装为命令数据并插入t_ds_process_instance表中,后续步骤如上,大致流程如下:


            • 命令分发:以用户提交的工作流请求为触发,MasterServer将其封装为命令数据并插入数据库中。
            • 任务分配:MasterServer循环查询待处理的命令,依照负载情况将任务分配到对应的ProcessInstance中。
            • 任务执行:根据DAG的依赖关系,WorkerServer会优先执行无依赖的任务,然后根据优先级逐步执行其他任务。
            • 状态反馈:任务执行过程中,WorkerServer会定期回调MasterServer,通知任务的进展和执行状态。

            所以,上述的问题就在这,Master从停止到启动时,t_ds_command中会产生大量的任务数据
            在DolphinScheduler3.2.1中,其t_ds_command数据样例为:
              id  |command_type|process_definition_code|process_definition_version|process_instance_id|command_param                        |task_depend_type|failure_strategy|warning_type|warning_group_id|schedule_time      |start_time         |executor_id|update_time        |process_instance_priority|worker_group|tenant_code|environment_code|dry_run|test_flag|
              ----+------------+-----------------------+--------------------------+-------------------+-------------------------------------+----------------+----------------+------------+----------------+-------------------+-------------------+-----------+-------------------+-------------------------+------------+-----------+----------------+-------+---------+
              1988|           6|         15921642898976|                         4|                  0|{"schedule_timezone":"Asia/Shanghai"}|               2|               1|           0|               0|2024-12-11 00:36:40|2024-12-11 00:39:01|          2|2024-12-11 00:39:01|                        2|default     |default    |              -1|      0|        0|
              1989|           6|         15921642898976|                         4|                  0|{"schedule_timezone":"Asia/Shanghai"}|               2|               1|           0|               0|2024-12-11 00:36:50|2024-12-11 00:39:01|          2|2024-12-11 00:39:01|                        2|default     |default    |              -1|      0|        0|
              1990|           6|         15921642898976|                         4|                  0|{"schedule_timezone":"Asia/Shanghai"}|               2|               1|           0|               0|2024-12-11 00:37:00|2024-12-11 00:39:01|          2|2024-12-11 00:39:01|                        2|default     |default    |              -1|      0|        0|
              command_type的枚举由源码中的CommandType定义,其内容如下:
                /**
                 * command types
                 * 0 start a new process
                 * 1 start a new process from current nodes
                 * 2 recover tolerance fault process
                 * 3 recover suspended process
                 * 4 start process from failure task nodes
                 * 5 complement data
                 * 6 start a new process from scheduler
                 * 7 repeat running a process
                 * 8 pause a process
                 * 9 stop a process
                 * 10 recover waiting thread
                 * 11 recover serial wait
                 * 12 start a task node in a process instance
                 */
                START_PROCESS(0, "start a new process"),
                START_CURRENT_TASK_PROCESS(1, "start a new process from current nodes"),
                RECOVER_TOLERANCE_FAULT_PROCESS(2, "recover tolerance fault process"),
                RECOVER_SUSPENDED_PROCESS(3, "recover suspended process"),
                START_FAILURE_TASK_PROCESS(4, "start process from failure task nodes"),
                COMPLEMENT_DATA(5, "complement data"),
                SCHEDULER(6, "start a new process from scheduler"),
                REPEAT_RUNNING(7, "repeat running a process"),
                PAUSE(8, "pause a process"),
                STOP(9, "stop a process"),
                RECOVER_WAITING_THREAD(10, "recover waiting thread"),
                RECOVER_SERIAL_WAIT(11, "recover serial wait"),
                EXECUTE_TASK(12, "start a task node in a process instance"),
                DYNAMIC_GENERATION(13, "dynamic generation"),
                ;
                那为什么会这样呢?本质上是Master自身的容错机制造成的,其容错机制可以细分为如下几个模块:
                • 1)Master自身的容错:如果是多Master同时运行,其中一个作为Active Master负责处理任务调度请求,其他节点作为Standby Master。当Active Master出现故障时,Standby Master将自动接管其工作,确保系统的正常运行。这是通过ZooKeeper实现的,ZooKeeper负责选举Active Master节点,并监控节点的状态。
                • 2)状态同步:多Master节点之间会进行状态同步,以确保在Active Master宕机时,Standby Master能够接管任务调度。
                • 3)故障恢复:当Master节点宕机后,其他Master节点会通过ZooKeeper的Watcher机制监听到这一事件,并触发故障恢复。
                • 4)正在运行任务的容错:当前Master节点宕机后,新Master会通过已下线的Master的地址和正在运行的工作流状态数组获取需要容错的ProcessInstance列表,之后将其放入t_ds_command表中(后续流程就是Master获取到并调度+Worker执行了)。
                • 5)分布式锁:在容错过程中,Master节点会使用ZK分布式锁+采用指定command表分配ID的形式来确保只有一个Master节点执行容错操作,避免多个Master节点同时接管同一个任务。
                • 6)定时容错线程:除了ZooKeeper的事件触发容错外,DolphinScheduler还实现了一个定时线程FailoverExecuteThread,用于Master重启后恢复自身之前的工作流实例。
                • 7)任务重试:DolphinScheduler还支持任务失败后的重试机制,这与服务宕机容错相辅相成,确保任务的最终执行成功。

                所以,此时根据原理+复现可以初步推测出,是在Master启动时的某一个线程进行的定时容错,接下来就进入源码来真正验证一下。

                04

                源码解析

                在org.apache.dolphinscheduler.server.master.MasterServer下,启动Master时会有run入口:
                  /**
                   * run master server
                   */
                  @PostConstruct
                  public void run() throws SchedulerException {
                      init rpc server
                      this.masterRPCServer.start();
                      install task plugin
                      this.taskPluginManager.loadPlugin();
                      this.masterSlotManager.start();
                      self tolerant
                      this.masterRegistryClient.start();
                      this.masterRegistryClient.setRegistryStoppable(this);
                      this.masterSchedulerBootstrap.start();
                      this.eventExecuteService.start();
                      this.failoverExecuteThread.start();
                      this.schedulerApi.start();
                      this.taskGroupCoordinator.start();
                      MasterServerMetrics.registerMasterCpuUsageGauge(() -> {
                          SystemMetrics systemMetrics = metricsProvider.getSystemMetrics();
                          return systemMetrics.getTotalCpuUsedPercentage();
                      });
                      MasterServerMetrics.registerMasterMemoryAvailableGauge(() -> {
                          SystemMetrics systemMetrics = metricsProvider.getSystemMetrics();
                          return (systemMetrics.getSystemMemoryMax() - systemMetrics.getSystemMemoryUsed())  1024.0 / 1024 / 1024;
                      });
                      MasterServerMetrics.registerMasterMemoryUsageGauge(() -> {
                          SystemMetrics systemMetrics = metricsProvider.getSystemMetrics();
                          return systemMetrics.getJvmMemoryUsedPercentage();
                      });
                      Runtime.getRuntime().addShutdownHook(new Thread(() -> {
                          if (!ServerLifeCycleManager.isStopped()) {
                              close("MasterServer shutdownHook");
                          }
                      }));
                  }
                  通过上面的代码,可以看到Master启动执行了:
                  • masterRPCServer.start():初始化并启动RPC服务器,用于节点间通信。
                  • taskPluginManager.loadPlugin():加载任务插件,这些插件可以扩展DolphinScheduler的任务类型。
                  • masterSlotManager.start():启动Master的Slot管理器,它负责管理Master的资源槽位,用于任务调度。
                  • masterRegistryClient.start():启动Master的注册客户端,它负责将Master节点注册到分布式协调服务(如ZooKeeper)中。
                  • masterRegistryClient.setRegistryStoppable(this):设置注册客户端的可停止对象,以便在Master停止时能够进行清理工作。
                  • masterSchedulerBootstrap.start():启动Master的调度引导服务,它负责初始化调度相关的服务。
                  • eventExecuteService.start():启动事件执行服务,它负责处理工作流中的事件,如任务状态变化。
                  • failoverExecuteThread.start():启动故障恢复执行线程,它负责在Master宕机后恢复任务执行。
                  • schedulerApi.start():启动调度API服务,提供调度相关的接口供外部调用。
                  • taskGroupCoordinator.start():启动任务组协调器,它负责协调任务组内的任务执行。

                  经过源码探查,发现最关键的failoverExecuteThread不是重新执行未调度的周期任务,而是容错未执行完的任务。并且其他源码中也没有关于恢复周期任务调度的内容。
                  那现在需要换一个思路,就是从下往上走:
                  1. 首先发现重启恢复后,Web页面上的“运行类型”是“调度执行”,而数据库的“command_type”是“6”,那就意味着必须有一个服务会有往数据库里面去插入command_type为6的方法。并且其会去获取t_ds_schedules表中的任务定时调度实例。
                  2. 根据源码,排查到dolphinscheduler-dao项目下会存放所有的数据库操作DAO,遂可以找到ScheduleMapper类,此类是和t_ds_schedules相关的DAO类;之后根据t_ds_command反查,找到了CommandServiceImpl类中的createCommand方法;再根据两者反查+command_type为6,找到了ProcessScheduleTask类中的executeInternal方法。
                  3. ProcessScheduleTask类中的executeInternal方法,同时满足:获取了调度任务、插入command数据、类型为6这三个条件。
                  4. 查看ProcessScheduleTask的executeInternal源码,前半部分是从Quartz上下文中获取到预定义的调度时间和调度实际运行时间,下半部分是校验这个调度Cron是否存在和上线。
                  5. 在executeInternal中,最关键的其实就是scheduledFireTime和fireTime。

                  找到这里的话,我们再结合DolphinScheduler+Quartz总结一下调度的原理:
                  • Web页面设置调度,其会通过SchedulerController中的createSchedule()来创建调度,并往t_ds_schedules中插入一条数据;
                  • Web页面设置调度上线,其会通过QuartzScheduler中的insertOrUpdateScheduleTask()向Quartz中创建Trigger触发器,并往QRTZ_CRON_TRIGGERS中插入一条数据;
                  • 之后定期调用ProcessScheduleTask中的executeInternal()来往t_ds_command中插入数据;
                  • 之后就是Master-Worker的执行流程了;

                  了解了大致的调度流程后,结合源码中的scheduledFireTime和fireTime,就可以推断出调度时间不是由DolphinScheduler设置的,而是由Quartz设置的。
                  那就继续查阅Quartz相关的资料,发现在Quartz中有一个misfire机制:周期性任务A需要在某个规定的时间执行,但是由于某种原因导致任务A未执行,称为MisFire。
                  而Quartz判断一个任务是MisFire,提供了一个配置项:org.quartz.jobStore.misfireThreshold,默认是60000ms(即60秒)。
                  misfire产生需要有2个前置条件:
                  • 一个是job到达触发时间时没有被执行;
                  • 二是被执行的延迟时间超过了Quartz配置的misfireThreshold阀值;

                  如果延迟执行的时间小于阀值,则Quartz不认为发生了misfire,立即执行job;如果延迟执行的时间大于或者等于阀值,则被判断为misfire,然后会按照指定的策略来执行。
                  那misfire产生的原因一般如下:
                  • 当job达到触发时间时,所有线程都被其他job占用,没有可用线程。;
                  • 在job需要触发的时间点,scheduler停止了(可能是意外停止的);【——当前的问题属于这种类型】
                  • job使用了@DisallowConcurrentExecution注解,job不能并发执行,当达到下一个job执行点的时候,上一个任务还没有完成;
                  • job指定了过去的开始执行时间,例如当前时间是8点00分00秒,指定开始时间为7点00分00秒;

                  而判定了任务是MisFire后,会有一个补偿机制,补偿机制只有在任务确认为MisFire状态后,才会被执行。补偿机制配置在Quartz源码的Trigger中:
                    public interface Trigger extends Serializable, Cloneable, Comparable<Trigger> {
                        long serialVersionUID = -3904243490805975570L;
                        int MISFIRE_INSTRUCTION_SMART_POLICY = 0;
                        int MISFIRE_INSTRUCTION_IGNORE_MISFIRE_POLICY = -1;
                        int DEFAULT_PRIORITY = 5;
                        ......
                    但是这个补偿机制需要根据Trigger来判定,如下是不同的Trigger:
                    在DolphinScheduler中,各种类型的Trigg都会涉及到:
                    Trigger的类型:
                    • SimpleTrigger是一个简单的触发器,用于执行重复任务。它可以指定一个起始时间,然后按照固定的间隔时间重复执行任务,直到达到指定的重复次数。SimpleTrigger的属性包括重复间隔(repeatInterval)和重复次数(repeatCount),实际执行次数是repeatCount + 1,因为在开始时间(startTime)时会执行一次。
                    • CronTrigger:CronTrigger使用Cron表达式来定义复杂的调度计划。Cron表达式由6或7个空格分隔的时间字段组成,分别表示秒、分、小时、一个月中的日期、月份、一周中的日期和可选的年份。CronTrigger允许设定非常复杂的触发时间表,基本上覆盖了其他触发器的绝大部分能力。
                    • CalendarIntervalTrigger:CalendarIntervalTrigger指定从某一个时间开始,以一定的时间间隔执行的任务。不同于SimpleTrigger只支持毫秒单位的时间间隔,CalendarIntervalTrigger支持的间隔单位有秒、分钟、小时、天、月、年。它适合的任务类似于每周执行一次。
                    • DailyTimeIntervalTrigger:DailyTimeIntervalTrigger指定每天的某个时间段内,以一定的时间间隔执行任务。并且它可以支持指定星期。它适合的任务类似于每天9:00到18:00,每隔70秒执行一次,并且只要周一至周五执行。
                    • ......

                    所以,因为不同的Trigger类型其参数是不一样的,所以当Trigger触发Misfire机制时,根据Trigger的不同,策略也会不同:
                      /**
                       公共的Misfire机制,在Trigger类中
                       **/
                       // 这是一个智能策略,Quartz会根据Trigger的类型自动选择一个合适的misfire策略。对于CronTrigger,默认使用MISFIRE_INSTRUCTION_FIRE_ONCE_NOW。
                      int MISFIRE_INSTRUCTION_SMART_POLICY = 0;
                      // 这个策略会将所有错过的触发事件,立即执行所有补偿动作。即使定时任务执行的时间已经结束,它也会把所有应该执行的任务一次性全部执行完。
                      int MISFIRE_INSTRUCTION_IGNORE_MISFIRE_POLICY = -1;
                      /**
                       SimpleTrigger的Misfire机制,在SimpleTrigger类中
                       **/
                       // 如果触发器错过了预定的触发时间,这个策略会立即执行一次任务,然后按照原计划继续执行后续的任务。
                      int MISFIRE_INSTRUCTION_FIRE_NOW = 1;
                      // 这个策略会将触发器的开始时间设置为当前时间,并立即执行错过的任务,包括已经错过的重复次数。
                      int MISFIRE_INSTRUCTION_RESCHEDULE_NOW_WITH_EXISTING_REPEAT_COUNT = 2;
                      // 类似于MISFIRE_INSTRUCTION_RESCHEDULE_NOW_WITH_EXISTING_REPEAT_COUNT,但是会忽略已经错过的触发次数,只执行剩余的重复次数。
                      int MISFIRE_INSTRUCTION_RESCHEDULE_NOW_WITH_REMAINING_REPEAT_COUNT = 3;
                      // 这个策略会忽略已经错过的触发次数,并在下一个预定的触发时间执行任务,执行剩余的重复次数。
                      int MISFIRE_INSTRUCTION_RESCHEDULE_NEXT_WITH_REMAINING_COUNT = 4;
                      // 类似于MISFIRE_INSTRUCTION_RESCHEDULE_NEXT_WITH_REMAINING_COUNT,但是会包括所有错过的重复次数。
                      int MISFIRE_INSTRUCTION_RESCHEDULE_NEXT_WITH_EXISTING_COUNT = 5;
                      /** 
                      CronTrigger的Misfire机制,在CronTrigger类中
                      **/
                      // 如果触发器错过了预定的触发时间,这个策略会立即执行一次任务,然后按照原计划继续执行后续的任务。
                      int MISFIRE_INSTRUCTION_FIRE_ONCE_NOW = 1;
                      // 对于CronTrigger,这个策略会忽略所有错过的触发事件,直接等待下一次预定的触发时间。
                      int MISFIRE_INSTRUCTION_DO_NOTHING = 2;
                      ......
                      QuartzScheduler的insertOrUpdateScheduleTask()中,用的只有CronTrigger,其源码如下:
                        CronTrigger cronTrigger = newTrigger()
                                .withIdentity(triggerKey)
                                .startAt(startDate)
                                .endAt(endDate)
                                .withSchedule(
                                        cronSchedule(cronExpression)
                                                .withMisfireHandlingInstructionIgnoreMisfires()
                                                .inTimeZone(DateUtils.getTimezone(timezoneId)))
                                .forJob(jobDetail).build();
                        // 往下走
                        public CronScheduleBuilder withMisfireHandlingInstructionIgnoreMisfires() {
                            this.misfireInstruction = -1;
                            return this;
                        }
                        其补偿机制采用的-1编码,也就是会将所有错过的触发事件,立即执行所有补偿动作。所以此时就可以解释,为什么Master重启后,会将所有的未执行的周期任务,全部执行一次!!!
                        这个设置根据Trigger的不同,也可以分别设置不同的参数:
                          /**
                           CronTrigger,引用CronScheduleBuilder中的设置
                           **/
                          public CronScheduleBuilder withMisfireHandlingInstructionIgnoreMisfires() {
                              this.misfireInstruction = -1;
                              return this;
                          }
                          public CronScheduleBuilder withMisfireHandlingInstructionDoNothing() {
                              this.misfireInstruction = 2;
                              return this;
                          }
                          public CronScheduleBuilder withMisfireHandlingInstructionFireAndProceed() {
                              this.misfireInstruction = 1;
                              return this;
                          }
                          /**
                           SimpleTrigger,引用SimpleScheduleBuilder的设置
                          **/
                          public SimpleScheduleBuilder withMisfireHandlingInstructionIgnoreMisfires() {
                              this.misfireInstruction = -1;
                              return this;
                          }
                          public SimpleScheduleBuilder withMisfireHandlingInstructionFireNow() {
                              this.misfireInstruction = 1;
                              return this;
                          }
                          public SimpleScheduleBuilder withMisfireHandlingInstructionNextWithExistingCount() {
                              this.misfireInstruction = 5;
                              return this;
                          }
                          public SimpleScheduleBuilder withMisfireHandlingInstructionNextWithRemainingCount() {
                              this.misfireInstruction = 4;
                              return this;
                          }
                          public SimpleScheduleBuilder withMisfireHandlingInstructionNowWithExistingCount() {
                              this.misfireInstruction = 2;
                              return this;
                          }
                          public SimpleScheduleBuilder withMisfireHandlingInstructionNowWithRemainingCount() {
                              this.misfireInstruction = 3;
                              return this;
                          }

                          05

                          解决方案

                          • 将任务设置为“串行等待”。——可行,但无法发挥出大数据集群的并行化优势。并且有一个致命的缺陷,就是串行等待的任务无法在页面数手动停止,需要去到t_ds_process_instance中更改状态或删除数据。
                          • Master HA:单机Master时,对Master设置守护,宕机后自动拉起(但也有无法拉起的时候);多机Master时,部署多台Master,使其可以实现HA。
                          • DolphinScheduler监控告警:持续监控DolphinScheduler运行状态,当有角色宕机后,及时发出告警信息(会有半夜宕机而运维人员没有及时发现告警信息的情况)。
                          • 设置DolphinScheduler的CPU和内存使用阈值:在配置文件中,默认的CPU和内存阈值是70%,以为着当服务器的CPU和内存占用达到了70%后,DolphinScheduler就不会在这台服务器上调度任务了。这种方式的好处是可以保证服务器资源不被打满,弊端是如果Master容错的旧任务打满了资源,那就会影响DolphinScheduler正常状态下的新任务了。并且有的任务是非常关键的任务,必须要跑成功的。
                          • 设置DolphinScheduler的任务数:在配置文件中,DolphinScheduler默认的任务数是单Worker100个,单Master是1000个。而在现网中,无法对任务数去做到精细的控制,并且DolphinScheduler也无法做到自动调配。
                          • 在宕机后重新启动前删除t_ds_command表中的数据:经过验证,Master在宕机后是不会往t_ds_command中写数据了。其会在重启启动后,将数据写到t_ds_command后执行,但其中的时间大概就1~2秒钟,手工无法去执行删除。
                          • 修改t_ds_process_instance中的数据:根据时间周期,修改t_ds_process_instance中所有这个范围内的工作流的状态,人工使其结束(但如果DolphinScheduler和元数据库在一台服务器上,容易DolphinScheduler启动后里面把服务器资源打满,造成无法操作元数据库了)。

                          上面的解决方案主要是分为:
                          • 避免或减少Master的宕机
                          • 在Master宕机后,不要运行MisFire的任务

                          首先是“避免或减少Master宕机”,这在生产环境中是很难做到的,计算机程序的假设就是100%会在某一个时刻产生某些问题,所以才有了各种微服务架构、高可用HA、多活、容灾等等机制。
                          其次是“不要运行MisFire的任务”,依照前面的解决方案,没有一个方案能解决这个问题。所以,根据之前的源码解析,需要考虑采用源码修改+重新编译打包的方式进行解决。

                          06

                          修改源码

                          将关键源码修改为:
                                        CronTrigger cronTrigger = newTrigger()
                                                .withIdentity(triggerKey)
                                                .startAt(startDate)
                                                .endAt(endDate)
                                                .withSchedule(
                                                        cronSchedule(cronExpression)
                                                                .withMisfireHandlingInstructionDoNothing()
                            //                                    .withMisfireHandlingInstructionIgnoreMisfires()
                                                                .inTimeZone(DateUtils.getTimezone(timezoneId)))
                                                .forJob(jobDetail).build();

                            07

                            开发环境验证

                            使用Java8进行。
                            更改Master、worker、API下的application.yaml中的MySQL链接信息:
                              spring:
                                config:
                                  activate:
                                    on-profile: mysql
                                datasource:
                                  driver-class-name: com.mysql.cj.jdbc.Driver
                                  url: jdbc:mysql://IP地址:3306/dolphinscheduler?useUnicode=true&characterEncoding=UTF-8&useSSL=false
                                  username: 账号
                                  password: 密码
                                quartz:
                                  properties:
                                    org.quartz.jobStore.driverDelegateClass: org.quartz.impl.jdbcjobstore.StdJDBCDelegate
                              更改Master、Worker、API下的Zookeeper信息:
                                registry:
                                  type: zookeeper
                                  zookeeper:
                                    namespace: dolphinscheduler_dev
                                    connect-string: IP地址:2181
                                    retry-policy:
                                      base-sleep-time: 60ms
                                      max-sleep: 300ms
                                      max-retries: 5
                                    session-timeout: 30s
                                    connection-timeout: 9s
                                    block-until-connected: 600ms
                                    digest: ~
                                更改bom下面的pom:
                                              <dependency>
                                                  <groupId>mysql</groupId>
                                                  <artifactId>mysql-connector-java</artifactId>
                                                  <version>${mysql-connector.version}</version>
                                  <!--                <scope>test</scope>-->
                                              </dependency>
                                  更改api、master、worker下的logback-spring.xml,开启运行日志:
                                        <root level="INFO">
                                    <!--        <if condition="${DOCKER:-false}">-->
                                    <!--            <then>-->
                                    <!--                <appender-ref ref="STDOUT"/>-->
                                    <!--            </then>-->
                                    <!--        </if>-->
                                            <appender-ref ref="STDOUT"/>
                                            <appender-ref ref="APILOGFILE"/>
                                        </root>
                                        
                                        <root level="INFO">
                                    <!--        <if condition="${DOCKER:-false}">-->
                                    <!--            <then>-->
                                    <!--                <appender-ref ref="STDOUT"/>-->
                                    <!--            </then>-->
                                    <!--        </if>-->
                                            <appender-ref ref="STDOUT"/>
                                            <appender-ref ref="TASKLOGFILE"/>
                                            <appender-ref ref="MASTERLOGFILE"/>
                                        </root>
                                        <root level="INFO">
                                    <!--        <if condition="${DOCKER:-false}">-->
                                    <!--            <then>-->
                                    <!--                <appender-ref ref="STDOUT"/>-->
                                    <!--            </then>-->
                                    <!--        </if>-->
                                            <appender-ref ref="STDOUT"/>
                                            <appender-ref ref="TASKLOGFILE"/>
                                            <appender-ref ref="WORKERLOGFILE"/>
                                        </root>
                                    启动 Master、Worker、Api:
                                    • Master VM Options:-Dlogging.config=classpath:logback-spring.xml -Ddruid.mysql.usePingMethod=false -Dspring.profiles.active=mysql
                                    • Worker VM Options:-Dlogging.config=classpath:logback-spring.xml -Ddruid.mysql.usePingMethod=false -Dspring.profiles.active=mysql
                                    • Api VM Options:-Dlogging.config=classpath:logback-spring.xml -Dspring.profiles.active=api,mysql


                                    如果报错:
                                      Error running 'ApiApplicationServer'
                                      Error running ApiApplicationServer.
                                      Command line is too long. Shorten the command line via JAR manifest or via a classpath file and rerun.
                                      则加入:
                                      如果还是报错缺少MySQL的JDBC驱动包,则在Master、Woker、API的Pom下面添加:
                                        <dependency>
                                            <groupId>mysql</groupId>
                                            <artifactId>mysql-connector-java</artifactId>
                                            <version>8.0.33</version>
                                        </dependency>

                                        08

                                        整体编译打包

                                        需要注意,此时打包的项目,需要只是经过了《修改源码》环境的,不是进行了《开发环境验证》环节的!!!
                                        使用Java8进行。
                                        在项目根目录下执行命令,打包时间较长:
                                        mvn spotless:apply clean package -Dmaven.test.skip=true -Prelease
                                        打包后的二进制文件在dolphinscheduler-dist/target 下 bin.tar.gz 后缀文件。
                                        之后就可以尝试重新部署,验证是否解决上面的问题了。

                                        09

                                        只编译单个模块

                                        去到dolphinscheduler-scheduler-quartz根目录下,执行:
                                        mvn spotless:apply clean package -Dmaven.test.skip=true -Prelease
                                        打包后的文件在dolphinscheduler-scheduler-quartz/target目录下:
                                        将其在服务器上进行替换:
                                          su dolphinscheduler -


                                          mv /opt/module/dolphinscheduler-3.2.1/master-server/libs/dolphinscheduler-scheduler-quartz-3.2.1.jar /opt/module/dolphinscheduler-3.2.1/master-server/libs/dolphinscheduler-scheduler-quartz-3.2.1.jar.bak
                                          mv /opt/module/dolphinscheduler-3.2.1/api-server/libs/dolphinscheduler-scheduler-quartz-3.2.1.jar /opt/module/dolphinscheduler-3.2.1/api-server/libs/dolphinscheduler-scheduler-quartz-3.2.1.jar.bak


                                          cp dolphinscheduler-scheduler-quartz-3.2.1.jar /opt/module/dolphinscheduler-3.2.1/master-server/libs/dolphinscheduler-scheduler-quartz-3.2.1.jar
                                          cp dolphinscheduler-scheduler-quartz-3.2.1.jar /opt/module/dolphinscheduler-3.2.1/api-server/libs/dolphinscheduler-scheduler-quartz-3.2.1.jar


                                          chown -R dolphinscheduler:dolphinscheduler /opt/module/dolphinscheduler-3.2.1/
                                          之后就可以尝试重启DolphinScheduler,验证是否解决上面的问题了。

                                          10

                                          问题解决

                                          再次进行问题复现,发现问题已经被解决了:
                                          至此,本次问题排查及修复完成。
                                          <🐬🐬 >

                                          推荐阅读

                                          用户实践案例
                                          奇富科技  腾讯音乐 联通数科 拈花云科
                                          蔚来汽车 长城汽车 集度 长安汽车
                                          思科网讯 食行生鲜 联通医疗 联想
                                          新网银行 唯品富邦消费金融  蜀海供应链 
                                          自如 有赞 伊利 当贝大数据
                                          珍岛集团 传智教育 Bigo
                                          YY直播  作业帮 太美医疗
                                          某新能源 中电信翼康
                                          迁移实践
                                          Azkaban   Ooize(当贝迁移案例)   
                                          Airflow (有赞迁移案例) 
                                          Air2phin(迁移工具)
                                          Airflow迁移实践

                                          新手入门
                                          选择Apache DolphinScheduler的10个理由
                                          Apache DolphinScheduler 3.1.8 保姆级教程【安装、介绍、项目运用、邮箱预警设置】轻松拿捏!
                                          Apache DolphinScheduler 如何实现自动化打包+单机/集群部署?
                                          DolphinScheduler快速上手:基于Docker Compose的安装与配置全攻略
                                          Apache DolphinScheduler 在大数据环境中的应用与调优
                                          Apache DolphinScheduler-3.2.0集群部署教程

                                          < 🐬🐬 >
                                          参与社区

                                          参与Apache DolphinScheduler 社区有非常多的参与贡献的方式,包括:


                                          贡献第一个PR(文档、代码) 我们也希望是简单的,第一个PR用于熟悉提交的流程和社区协作以及感受社区的友好度。

                                          社区汇总了以下适合新手的问题列表:https://github.com/apache/dolphinscheduler/issues/5689

                                          非新手问题列表:https://github.com/apache/dolphinscheduler/issues?
                                          q=is%3Aopen+is%3Aissue+label%3A%22volunteer+wanted%22

                                          如何参与贡献链接:https://dolphinscheduler.apache.org/zh-cn/community/development/contribute.html

                                          来吧,DolphinScheduler开源社区需要您的参与,为中国开源崛起添砖加瓦吧,哪怕只是小小的一块瓦,汇聚起来的力量也是巨大的!


                                          球分享

                                          球点赞

                                          球在看


                                          文章转载自海豚调度,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

                                          评论