分布式作业系统 Elastic-Job-Lite 源码分析 —— 自诊断修复

老艿艿 芋道源码 2018-12-11

点击上方“芋道源码”,选择“置顶公众号”

技术文章第一时间送达!

源码精品专栏

 

摘要: 原创出处 http://www.iocoder.cn/Elastic-Job/reconcile/ 「芋道源码」欢迎转载,保留摘要,谢谢!

本文基于 Elastic-Job V2.1.5 版本分享

  • 1. 概述

  • 2. ReconcileService


1. 概述

本文主要分享 Elastic-Job-Lite 自诊断修复

在分布式的场景下由于网络、时钟等原因,可能导致 Zookeeper 的数据与真实运行的作业产生不一致,这种不一致通过正向的校验无法完全避免。需要另外启动一个线程定时校验注册中心数据与真实作业状态的一致性,即维持 Elastic-Job 的最终一致性

涉及到主要类的类图如下( 打开大图 ):

  • 在 Elastic-Job-lite 里,调解分布式作业不一致状态服务( ReconcileService ) 实现了自诊断修复功能。

你行好事会因为得到赞赏而愉悦 
同理,开源项目贡献者会因为 Star 而更加有动力 
为 Elastic-Job 点赞!传送门

2. ReconcileService

ReconcileService,调解分布式作业不一致状态服务。

ReconcileService 继承 Google Guava AbstractScheduledService 抽象类,实现 #scheduler()#runOneIteration() 方法,达到周期性校验注册中心数据与真实作业状态的一致性。

#scheduler() 方法实现如下

// ReconcileService.java
@Override
protected Scheduler scheduler() {
   return Scheduler.newFixedDelaySchedule(01, TimeUnit.MINUTES);
}
  • 每 1 分钟会调用一次 #runOneIteration() 方法进行校验。

  • Google Guava AbstractScheduledService 相关的知识,有兴趣的同学可以自己 Google 学习哟。

#runOneIteration() 方法实现如下

// ReconcileService.java
@Override
protected void runOneIteration() throws Exception {
   LiteJobConfiguration config = configService.load(true);
   int reconcileIntervalMinutes = null == config ? -1 : config.getReconcileIntervalMinutes();
   if (reconcileIntervalMinutes > 0 && (System.currentTimeMillis() - lastReconcileTime >= reconcileIntervalMinutes * 60 * 1000)) { // 校验是否达到校验周期
       // 设置最后校验时间
       lastReconcileTime = System.currentTimeMillis();
       if (leaderService.isLeaderUntilBlock() // 主作业节点才可以执行
               && !shardingService.isNeedSharding() // 当前作业不需要重新分片
               && shardingService.hasShardingInfoInOfflineServers()) { // 查询是包含有分片节点的不在线服务器
           log.warn("Elastic Job: job status node has inconsistent value,start reconciling...");
           // 设置需要重新分片的标记
           shardingService.setReshardingFlag();
       }
   }
}
  • 通过作业配置,设置修复作业服务器不一致状态服务调度间隔时间属性( LiteJobConfiguration.reconcileIntervalMinutes )。

  • 调用 ShardingService#setReshardingFlag() 方法,设置需要重新分片的标记。这个也是 ReconcileService 最本质的行为,有了这个标记后,作业会重新进行分片,达到作业节点本地分片数据与 Zookeeper 数据一致。作业分片逻辑,在《Elastic-Job-Lite 源码分析 —— 作业分片》有详细解析。

  • 调解分布式作业不一致状态服务一共有三个条件:

    • 调用 LeaderService#isLeaderUntilBlock() 方法,判断当前作业节点是否为主节点。在《Elastic-Job-Lite 源码分析 —— 主节点选举》有详细解析。

    • 调用 ShardingService#isNeedSharding() 方法,判断当前作业是否需要重分片。如果需要重新分片,就不要重复设置当前作业需要重新分片的标记。

    • 调用 ShardingService#hasShardingInfoInOfflineServers() 方法,查询是否包含有分片节点的不在线服务器。永久数据节点 /${JOB_NAME}/sharding/${ITEM_INDEX}/instance存储分配的作业节点主键( ${JOB_INSTANCE_ID} ), 不会随着作业节点因为各种原因断开后会话超时移除,而临时数据节点/${JOB_NAME}/instances/${JOB_INSTANCE_ID} 随着作业节点因为各种原因断开后超时会话超时移除。当查询到包含有分片节点的不在线的作业节点,设置需要重新分片的标记后进行重新分片,将其持有的作业分片分配给其它在线的作业节点。

      // ShardingService.java
       /**
       * 查询是包含有分片节点的不在线服务器.
       * 
       * @return 是包含有分片节点的不在线服务器
       */

      public boolean hasShardingInfoInOfflineServers() {
          List onlineInstances = jobNodeStorage.getJobNodeChildrenKeys(InstanceNode.ROOT); // "/¨E123EJOB¨E95ENAME¨E125E/instances/¨E123EJOB¨E95ENAME¨E125E/instances/{JOB_INSTANCE_ID}"
          int shardingTotalCount = configService.load(true).getTypeConfig().getCoreConfig().getShardingTotalCount();
          for (int i = 0; i < shardingTotalCount; i++) {
              if (!onlineInstances.contains(jobNodeStorage.getJobNodeData(ShardingNode.getInstanceNode(i)))) { // "/¨E123EJOB¨E95ENAME¨E125E/sharding/¨E123EJOB¨E95ENAME¨E125E/sharding/{ITEM_INDEX}/instance"
                  return true;
              }
          }
          return false;
      }




欢迎加入我的知识星球,一起探讨架构,交流源码。加入方式,长按下方二维码噢

已在知识星球更新源码解析如下:

  • 《精尽 Dubbo 源码解析系列》69 篇。

  • 《精尽 Netty 源码解析系列》61 篇。

  • 《精尽 Spring 源码解析系列》35 篇。

  • 《精尽 MyBatis 源码解析系列》34 篇。

  • 《数据库实体设计》17 篇。

  • 《精尽 Spring MVC 源码解析系列》15 篇。


目前在知识星球更新了《Dubbo 源码解析》目录如下:

01. 调试环境搭建
02. 项目结构一览
03. 配置 Configuration
04. 核心流程一览

05. 拓展机制 SPI

06. 线程池

07. 服务暴露 Export

08. 服务引用 Refer

09. 注册中心 Registry

10. 动态编译 Compile

11. 动态代理 Proxy

12. 服务调用 Invoke

13. 调用特性 

14. 过滤器 Filter

15. NIO 服务器

16. P2P 服务器

17. HTTP 服务器

18. 序列化 Serialization

19. 集群容错 Cluster

20. 优雅停机

21. 日志适配

22. 状态检查

23. 监控中心 Monitor

24. 管理中心 Admin

25. 运维命令 QOS

26. 链路追踪 Tracing

... 一共 69+ 篇

目前在知识星球更新了《Netty 源码解析》目录如下:

01. 调试环境搭建
02. NIO 基础
03. Netty 简介
04. 启动 Bootstrap

05. 事件轮询 EventLoop

06. 通道管道 ChannelPipeline

07. 通道 Channel

08. 字节缓冲区 ByteBuf

09. 通道处理器 ChannelHandler

10. 编解码 Codec

11. 工具类 Util

... 一共 61+ 篇


目前在知识星球更新了《数据库实体设计》目录如下:


01. 商品模块
02. 交易模块
03. 营销模块
04. 公用模块

... 一共 17+ 篇


目前在知识星球更新了《Spring 源码解析》目录如下:


01. 调试环境搭建
02. IoC Resource 定位
03. IoC BeanDefinition 载入

04. IoC BeanDefinition 注册

05. IoC Bean 获取

06. IoC Bean 生命周期

... 一共 35+ 篇


目前在知识星球更新了《MyBatis 源码解析》目录如下:


01. 调试环境搭建
02. 项目结构一览
03. MyBatis 面试题合集

04. MyBatis 学习资料合集

05. MyBatis 初始化

06. SQL 初始化

07. SQL 执行

08. 插件体系

09. Spring 集成

... 一共 34+ 篇


源码不易↓↓↓

点赞支持老艿艿↓↓

    阅读原文
    本站仅按申请收录文章,版权归原作者所有
    如若侵权,请联系本站删除
    觉得不错,分享给更多人看到