分布式作业系统 Elastic-Job-Lite 源码分析 —— 作业失效转移

老艿艿 芋道源码 2018-11-14

点击上方“芋道源码”,选择“置顶公众号”

技术文章第一时间送达!

源码精品专栏

 


摘要: 原创出处 http://www.iocoder.cn/Elastic-Job/job-failover/ 「芋道源码」欢迎转载,保留摘要,谢谢!

本文基于 Elastic-Job V2.1.5 版本分享

  • 1. 概述

  • 2. 作业节点崩溃监听

  • 3. 作业失效转移

  • 4. 获取作业分片上下文集合

  • 5. 监听作业失效转移功能关闭

  • 666. 彩蛋


1. 概述

本文主要分享 Elastic-Job-Lite 作业失效转移

当作业节点执行作业异常崩溃时,其所分配的作业分片项在下次重新分片之前不会被重新执行。开启失效转移功能后,这部分作业分片项将被其他作业节点抓取后“执行”。为什么此处的执行打引号呢?😈下文我们会分享到噢,卖个关子。

笔者对失效转移理解了蛮久时间,因此引用官方对它的解释,让你能更好的理解:

来源地址:https://my.oschina.net/u/719192/blog/506062 
失效转移: 运行中的作业服务器崩溃不会导致重新分片,只会在下次作业启动时分片。启用失效转移功能可以在本次作业执行过程中,监测其他作业服务器空闲,抓取未完成的孤儿分片项执行。 
-- 分隔符 -- 
来源地址:http://dangdangdotcom.github.io/elastic-job/elastic-job-lite/03-design/lite-design/ 
实现失效转移功能,在某台服务器执行完毕后主动抓取未分配的分片,并且在某台服务器下线后主动寻找可用的服务器执行任务。

这样看概念可能还是比较难理解,代码搞起来!

涉及到主要类的类图如下( 打开大图 ):

  • 粉色的类在 com.dangdang.ddframe.job.lite.internal.failover 包下,实现了 Elastic-Job-Lite 作业失效转移。

  • FailoverService,作业失效转移服务。

  • FailoverNode,作业失效转移数据存储路径。

  • FailoverListenerManager,作业失效转移监听管理器。

你行好事会因为得到赞赏而愉悦 
同理,开源项目贡献者会因为 Star 而更加有动力 
为 Elastic-Job 点赞!传送门

2. 作业节点崩溃监听

当作业节点崩溃时,监听器 JobCrashedJobListener 会监听到该情况,进行作业失效转移处理。

// JobCrashedJobListener.java
class JobCrashedJobListener extends AbstractJobListener {

   @Override
   protected void dataChanged(final String path, final Type eventType, final String data) {
       if (isFailoverEnabled() && Type.NODE_REMOVED == eventType
               && instanceNode.isInstancePath(path)) { // /${JOB_NAME}/instances/${INSTANCE_ID}
           String jobInstanceId = path.substring(instanceNode.getInstanceFullPath().length() + 1);
           if (jobInstanceId.equals(JobRegistry.getInstance().getJobInstance(jobName).getJobInstanceId())) {
               return;
           }
           List failoverItems = failoverService.getFailoverItems(jobInstanceId); // /${JOB_NAME}/sharding/${ITEM_ID}/failover
           if (!failoverItems.isEmpty()) {
               for (int each : failoverItems) {
                   failoverService.setCrashedFailoverFlag(each);
                   failoverService.failoverIfNecessary();
               }
           } else {
               for (int each : shardingService.getShardingItems(jobInstanceId)) { // /${JOB_NAME}/sharding/${ITEM_ID}/instance
                   failoverService.setCrashedFailoverFlag(each);
                   failoverService.failoverIfNecessary();
               }
           }
       }
   }
}
  • 通过判断 /${JOB_NAME}/instances/${INSTANCE_ID} 被移除,执行作业失效转移逻辑。❓说好的作业节点崩溃呢?经过确认,目前这块存在 BUG,未判断作业节点是否为奔溃。所以在当前版本,作业失效转移面向的是所有作业节点关闭逻辑,不仅限于作业崩溃关闭。

  • 优先调用 FailoverService#getFailoverItems(...) 方法,获得关闭作业节点( ${JOB_INSTANCE_ID} )对应的 ${JOB_NAME}/sharding/${ITEM_ID}/failover 作业分片项。

    若该作业分片项为空,再调用 ShardingService#getShardingItems(...) 方法,获得关闭作业节点( ${JOB_INSTANCE_ID} )对应的 /${JOB_NAME}/sharding/${ITEM_ID}/instance 作业分片项。

    为什么是这样的顺序呢?放在 FailoverService#failoverIfNecessary() 一起讲。这里先看下 FailoverService#getFailoverItems(...) 方法的实现:

    // FailoverService
    public List getFailoverItems(final String jobInstanceId) {
       List items = jobNodeStorage.getJobNodeChildrenKeys(ShardingNode.ROOT);
       List result = new ArrayList<>(items.size());
       for (String each : items) {
           int item = Integer.parseInt(each);
           String node = FailoverNode.getExecutionFailoverNode(item); // ${JOB_NAME}/sharding/${ITEM_ID}/failover
           if (jobNodeStorage.isJobNodeExisted(node) && jobInstanceId.equals(jobNodeStorage.getJobNodeDataDirectly(node))) {
               result.add(item);
           }
       }
       Collections.sort(result);
       return result;
    }
  • 调用 FailoverService#setCrashedFailoverFlag(...) 方法,设置失效的分片项标记 /${JOB_NAME}/leader/failover/items/${ITEM_ID}。该数据节点为永久节点,存储空串( "")。

    // FailoverService.java
    public void setCrashedFailoverFlag(final int item) {
       if (!isFailoverAssigned(item)) {
           jobNodeStorage.createJobNodeIfNeeded(FailoverNode.getItemsNode(item)); // /¨E123EJOB¨E95ENAME¨E125E/leader/failover/items/¨E123EJOB¨E95ENAME¨E125E/leader/failover/items/{ITEM_ID}
       }
    }

    private boolean isFailoverAssigned(final Integer item) {
       return jobNodeStorage.isJobNodeExisted(FailoverNode.getExecutionFailoverNode(item));
    }
  • 调用 FailoverService#failoverIfNecessary() 方法,如果需要失效转移, 则执行作业失效转移。

  • 3. 作业失效转移

    调用 FailoverService#failoverIfNecessary() 方法,如果需要失效转移, 则执行作业失效转移。

    // FailoverService.java
    public void failoverIfNecessary() {
       if (needFailover()) {
           jobNodeStorage.executeInLeader(FailoverNode.LATCH, new FailoverLeaderExecutionCallback());
       }
    }
    • 调用 #needFailover() 方法,判断是否满足失效转移条件。

      private boolean needFailover() {
                  // ${JOB_NAME}/leader/failover/items/${ITEM_ID} 有失效转移的作业分片项
          return jobNodeStorage.isJobNodeExisted(FailoverNode.ITEMS_ROOT) && !jobNodeStorage.getJobNodeChildrenKeys(FailoverNode.ITEMS_ROOT).isEmpty()
                  // 当前作业不在运行中
                  && !JobRegistry.getInstance().isJobRunning(jobName);
      }
      • 条件一:${JOB_NAME}/leader/failover/items/${ITEM_ID} 有失效转移的作业分片项。

      • 条件二:当前作业不在运行中。此条件即是上文提交的作业节点空闲的定义。

        失效转移: 运行中的作业服务器崩溃不会导致重新分片,只会在下次作业启动时分片。启用失效转移功能可以在本次作业执行过程中,监测其他作业服务器【空闲】,抓取未完成的孤儿分片项执行

    • 调用 JobNodeStorage#executeInLeader(…) 方法,使用 FailoverNode.LATCH/${JOB_NAME}/leader/failover/latch ) 路径构成的分布式锁,保证 FailoverLeaderExecutionCallback 的回调方法同一时间,即使多个作业节点调用,有且仅有一个作业节点进行执行。另外,虽然 JobNodeStorage#executeInLeader(…) 方法上带有 Leader 关键字,实际非必须在主节点的操作,任何一个拿到分布式锁的作业节点都可以调用。目前和分布式锁相关的逻辑,在 Elastic-Job-Lite 里,都会调用 JobNodeStorage#executeInLeader(…) 方法,数据都存储在 /leader/ 节点目录下。关于分布式锁相关的,在《Elastic-Job-Lite 源码分析 —— 注册中心》「3.1 在主节点执行操作」有详细分享。


    FailoverLeaderExecutionCallback 回调逻辑如下:

    class FailoverLeaderExecutionCallback implements LeaderExecutionCallback {

       @Override
       public void execute() {
           // 判断需要失效转移
           if (JobRegistry.getInstance().isShutdown(jobName) || !needFailover()) {
               return;
           }
           // 获得一个 `${JOB_NAME}/leader/failover/items/${ITEM_ID}` 作业分片项
           int crashedItem = Integer.parseInt(jobNodeStorage.getJobNodeChildrenKeys(FailoverNode.ITEMS_ROOT).get(0));
           log.debug("Failover job '{}' begin, crashed item '{}'", jobName, crashedItem);
           // 设置这个 `${JOB_NAME}/sharding/${ITEM_ID}/failover` 作业分片项 为 当前作业节点
           jobNodeStorage.fillEphemeralJobNode(FailoverNode.getExecutionFailoverNode(crashedItem), JobRegistry.getInstance().getJobInstance(jobName).getJobInstanceId());
           // 移除这个 `${JOB_NAME}/leader/failover/items/${ITEM_ID}` 作业分片项
           jobNodeStorage.removeJobNodeIfExisted(FailoverNode.getItemsNode(crashedItem));
           // TODO 不应使用triggerJob, 而是使用executor统一调度 疑问:为什么要用executor统一,后面研究下
           // 触发作业执行
           JobScheduleController jobScheduleController = JobRegistry.getInstance().getJobScheduleController(jobName);
           if (null != jobScheduleController) {
               jobScheduleController.triggerJob();
           }
       }
    }
    • 再次调用 #needFailover() 方法,确保经过分布式锁获取等待过程中,仍然需要失效转移。因为可能多个作业节点调用了该回调,第一个作业节点执行了失效转移,可能第二个作业节点就不需要执行失效转移了。

    • 调用 JobNodeStorage#getJobNodeChildrenKeys(FailoverNode.ITEMS_ROOT)#get(0) 方法,获得一个 ${JOB_NAME}/leader/failover/items/${ITEM_ID} 作业分片项。

      调用 JobNodeStorage#fillEphemeralJobNode(...) 方法,设置这个临时数据节点 ${JOB_NAME}/sharding/${ITEM_ID}failover 作业分片项为当前作业节点( ${JOB_INSTANCE_ID} )。

      调用 JobNodeStorage#removeJobNodeIfExisted(...) 方法,移除这个${JOB_NAME}/leader/failover/items/${ITEM_ID} 作业分片项。

    • 调用 JobScheduleController#triggerJob() 方法,立即启动作业。调用该方法,实际作业不会立即执行,而仅仅是进行触发。如果有多个失效转移的作业分片项,多次调用 JobScheduleController#triggerJob() 方法会不会导致作业是并行执行的?答案是不会,因为一个作业的 Quartz 线程数设置为 1。

      // JobScheduler.java
      private Properties getBaseQuartzProperties() {
         Properties result = new Properties();
         // ... 省略无关代码
         result.put("org.quartz.threadPool.threadCount""1"); // Quartz 线程数:1
         // ... 省略无关代码
         return result;
      }

    如果说作业分片项实现转移时,每个作业节点都不处于非空闲状态,岂不是 FailoverLeaderExecutionCallback 一直无法被回调?答案当然不是的。作业在执行完分配给自己的作业分片项,会调用 LiteJobFacade#failoverIfNecessary() 方法,进行失效转移的作业分片项抓取:

    public final void execute() {
       // ...  省略无关代码

       // 执行 普通触发的作业
       execute(shardingContexts, JobExecutionEvent.ExecutionSource.NORMAL_TRIGGER);
       // 执行 被跳过触发的作业
       while (jobFacade.isExecuteMisfired(shardingContexts.getShardingItemParameters().keySet())) {
           jobFacade.clearMisfire(shardingContexts.getShardingItemParameters().keySet());
           execute(shardingContexts, JobExecutionEvent.ExecutionSource.MISFIRE);
       }

       // 执行 作业失效转移
       jobFacade.failoverIfNecessary();

       // ...  省略无关代码
    }

    // LiteJobFacade.java
    @Override
    public void failoverIfNecessary() {
       if (configService.load(true).isFailover()) {
           failoverService.failoverIfNecessary();
       }
    }

    // FailoverService.java
    public void failoverIfNecessary() {
       if (needFailover()) {
           jobNodeStorage.executeInLeader(FailoverNode.LATCH, new FailoverLeaderExecutionCallback());
       }
    }

    让我们在翻回 JobCrashedJobListener 处代码,为什么获取失效转移的作业分片项是这样的优先顺序?一个作业节点拥有 ${JOB_NAME}/sharding/${ITEM_ID}/failover 数据分片项,意味着分配给它的作业分片项已经执行完成,否则怎么回调 FailoverLeaderExecutionCallback 方法,抓取失效转移的作业分片项呢?!

    旁白君:双击666,关注笔者公众号一波。

    此处 JobFacade#failoverIfNecessary() 方法,只会抓取一个失效转移的作业分片,这样带来的好处是,多个作业分片可以一起承担执行失效转移的分片集合。举个例子:一个作业集群有 A / B / C 三个节点,分成六个作业分片,如果 C 节点挂了,A / B 节点分担 C 节点的两个分片。但是,也可能会存在失效转移的分片被执行。举个例子:一个作业集群有 A / B / C 三个节点,分成九个作业分片,如果 C 节点挂了,A / B 节点分担 C 节点的两个分片,有一个被漏掉,只能等下次作业分片才能执行。未来这块算法会进行优化。

    4. 获取作业分片上下文集合

    在《Elastic-Job-Lite 源码分析 —— 作业执行》「4.2 获取当前作业服务器的分片上下文」中,我们可以看到作业执行器( AbstractElasticJobExecutor ) 执行作业时,会获取当前作业服务器的分片上下文进行执行。获取过程总体如下顺序图( 打开大图 ):

    • 红色叉叉在《Elastic-Job-Lite 源码解析 —— 作业分片》有详细分享。

    实现代码如下:

    // LiteJobFacade.java
    @Override
    public ShardingContexts getShardingContexts() {
       // 获得 失效转移的作业分片项
       boolean isFailover = configService.load(true).isFailover();
       if (isFailover) {
           List failoverShardingItems = failoverService.getLocalFailoverItems();
           if (!failoverShardingItems.isEmpty()) {
               // 【忽略,作业分片详解】获取当前作业服务器分片上下文
               return executionContextService.getJobShardingContext(failoverShardingItems);
           }
       }
       // 【忽略,作业分片详解】作业分片,如果需要分片且当前节点为主节点
       shardingService.shardingIfNecessary();
       // 【忽略,作业分片详解】获得 分配在本机的作业分片项
       List shardingItems = shardingService.getLocalShardingItems();
       // 移除 分配在本机的失效转移的作业分片项目
       if (isFailover) {
           shardingItems.removeAll(failoverService.getLocalTakeOffItems());
       }
       // 移除 被禁用的作业分片项
       shardingItems.removeAll(executionService.getDisabledItems(shardingItems));
       // 【忽略,作业分片详解】获取当前作业服务器分片上下文
       return executionContextService.getJobShardingContext(shardingItems);
    }
    • 调用 FailoverService#getLocalFailoverItems() 方法,获取运行在本作业节点的失效转移分片项集合。

      // FailoverService.java
      public List getLocalFailoverItems() {
         if (JobRegistry.getInstance().isShutdown(jobName)) {
             return Collections.emptyList();
         }
         return getFailoverItems(JobRegistry.getInstance().getJobInstance(jobName).getJobInstanceId()); // ${JOB_NAME}/sharding/${ITEM_ID}/failover
      }
    • 调用 ExecutionContextService#getJobShardingContext() 方法,获取当前作业服务器分片上下文。在《Elastic-Job-Lite 源码解析 —— 作业分片》「4. 获取作业分片上下文集合」有详细解析。

    • 当本作业节点不存在抓取的失效转移分片项,则获得分配给本作业分解的作业分片项。此时你会看到略奇怪的方法调用,shardingItems.removeAll(failoverService.getLocalTakeOffItems())。为什么呢?举个例子,作业节点A持有作业分片项[0, 1],此时异常断网,导致[0, 1]被作业节点B失效转移抓取,此时若作业节点A恢复,作业分片项[0, 1]依然属于作业节点A,但是可能已经在作业节点B执行,因此需要进行移除,避免多节点运行相同的作业分片项。FailoverService#getLocalTakeOffItems() 方法实现代码如下:

      // FailoverService.java
      /**
      * 获取运行在本作业服务器的被失效转移的序列号.

      @return 运行在本作业服务器的被失效转移的序列号
      */

      public List getLocalTakeOffItems() {
         List shardingItems = shardingService.getLocalShardingItems();
         List result = new ArrayList<>(shardingItems.size());
         for (int each : shardingItems) {
             if (jobNodeStorage.isJobNodeExisted(FailoverNode.getExecutionFailoverNode(each))) {
                 result.add(each);
             }
         }
         return result;
      }

    5. 监听作业失效转移功能关闭

    class FailoverSettingsChangedJobListener extends AbstractJobListener {

       @Override
       protected void dataChanged(final String path, final Type eventType, final String data) {
           if (configNode.isConfigPath(path) && Type.NODE_UPDATED == eventType
                   && !LiteJobConfigurationGsonFactory.fromJson(data).isFailover()) { // 关闭失效转移功能
               failoverService.removeFailoverInfo();
           }
       }
    }

    666. 彩蛋

    旁白君:啊啊啊,有点绕。 
    芋道君:耐心,耐心,耐心。

    道友,赶紧上车,分享一波朋友圈!




    如果你对 Dubbo / Netty 等等源码与原理感兴趣,欢迎加入我的知识星球一起交流。长按下方二维码噢


    目前在知识星球更新了《Dubbo 源码解析》目录如下:

    01. 调试环境搭建
    02. 项目结构一览
    03. 配置 Configuration
    04. 核心流程一览

    05. 拓展机制 SPI

    06. 线程池

    07. 服务暴露 Export

    08. 服务引用 Refer

    09. 注册中心 Registry

    10. 动态编译 Compile

    11. 动态代理 Proxy

    12. 服务调用 Invoke

    13. 调用特性 

    14. 过滤器 Filter

    15. NIO 服务器

    16. P2P 服务器

    17. HTTP 服务器

    18. 序列化 Serialization

    19. 集群容错 Cluster

    20. 优雅停机

    21. 日志适配

    22. 状态检查

    23. 监控中心 Monitor

    24. 管理中心 Admin

    25. 运维命令 QOS

    26. 链路追踪 Tracing

    ... 一共 69+ 篇

    目前在知识星球更新了《Netty 源码解析》目录如下:

    01. 调试环境搭建
    02. NIO 基础
    03. Netty 简介
    04. 启动 Bootstrap

    05. 事件轮询 EventLoop

    06. 通道管道 ChannelPipeline

    07. 通道 Channel

    08. 字节缓冲区 ByteBuf

    09. 通道处理器 ChannelHandler

    10. 编解码 Codec

    11. 工具类 Util

    ... 一共 61+ 篇


    目前在知识星球更新了《数据库实体设计》目录如下:


    01. 商品模块
    02. 交易模块
    03. 营销模块
    04. 公用模块

    ... 一共 17+ 篇


    目前在知识星球更新了《Spring 源码解析》目录如下:


    01. 调试环境搭建
    02. IoC Resource 定位
    03. IoC BeanDefinition 载入

    04. IoC BeanDefinition 注册

    05. IoC Bean 获取

    06. IoC Bean 生命周期

    ... 一共 35+ 篇


    源码不易↓↓↓

    点赞支持老艿艿↓↓



      本站仅按申请收录文章,版权归原作者所有
      如若侵权,请联系本站删除
      觉得不错,分享给更多人看到