BruceBat's Blog
BruceBat's Blog

竹杖芒鞋轻胜马,谁怕?一蓑烟雨任平生


  • 首页

  • 归档

  • 分类

  • 标签

  • 搜索
NIO I/O 计算机科学 操作系统 设计模式 随记 WebSocket 计算机网络 注册中心 经典电影 xxl-job 分布式 分布式任务调度 MySQL DevOps Docker 多线程 有趣的问题 Mybatis-Plus Mybatis Java 数据结构

分布式调度中间件xxl-job(六):调度中心--总览

发表于 2020-06-21 | 分类于 Java | 0 | 阅读次数 2679
❝

人生苦短,不如养狗

❞

一、前言

  在前面我们曾经说过,如果将常见的任务调度中间件分为 「中心化」 和 「去中心化」 两个流派的话,那么 xxl-job 可以说是中心化的典型代表。而 xxl-job 中心化最重要的一个组成部分就是我们下面要介绍的 「调度中心」 。
  在最新版的 xxl-job 架构图中,我们可以看到调度中心提供了诸如任务管理、执行器管理、日志管理、任务调度/路由、失败告警等等功能,具体可以参考下面的架构图:

  可以看到调度中心提供的功能还是相当多的。不过马克主义哲学的矛盾分析法告诉我们,看问题要抓主要矛盾的主要方面。抛开日志管理、失败报警等运维相关的功能,调度中心最核心的功能应该是如下三个:

  • 服务注册发现(包括执行器注册发现、任务注册发现)
  • 任务调度/路由机制
  • 失败重试机制

  下面我们通过调度中心启动过程来一窥上面三个功能是如何运作的。

二、初探调度中心

  前段时间火了一个名词 「“时间管理者”」 。仔细想想,管理着众多定时任务的调度中心何尝不也是一个优秀的时间管理者吗?下面让我们来欣赏一下这位时间管理者的时间管理秘籍:

public void init() throws Exception {
    // init i18n
    initI18n();

    // admin registry monitor run
    JobRegistryMonitorHelper.getInstance().start();

    // admin fail-monitor run
    JobFailMonitorHelper.getInstance().start();

    // admin lose-monitor run
    JobLosedMonitorHelper.getInstance().start();

    // admin trigger pool start
    JobTriggerPoolHelper.toStart();

    // admin log report start
    JobLogReportHelper.getInstance().start();

    // start-schedule
    JobScheduleHelper.getInstance().start();

    logger.info(">>>>>>>>> init xxl-job admin success.");
}

1. 任务注册发现

  在 JobRegistryMonitorHelper.getInstance().start(); 方法中开启了一个任务注册发现监控器的保护线程。在这个保护线程中,调度中心会进行任务组的发现注册,这一过程主要是检查并去除已经离线的执行器实例(一个应用会有多个实例),重新刷新执行器组在线实例地址以及更新任务组的注册地址。核心代码逻辑如下:

// auto registry group
List<XxlJobGroup> groupList = XxlJobAdminConfig.getAdminConfig().getXxlJobGroupDao().findByAddressType(0);
if (groupList!=null && !groupList.isEmpty()) {

 // remove dead address (admin/executor)
 List<Integer> ids = XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().findDead(RegistryConfig.DEAD_TIMEOUT, new Date());
 if (ids!=null && ids.size()>0) {
  XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().removeDead(ids);
 }

 // 刷新执行器在线地址 (admin/executor)
 HashMap<String, List<String>> appAddressMap = new HashMap<String, List<String>>();
 List<XxlJobRegistry> list = XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().findAll(RegistryConfig.DEAD_TIMEOUT, new Date());
 if (list != null) {
  for (XxlJobRegistry item: list) {
   if (RegistryConfig.RegistType.EXECUTOR.name().equals(item.getRegistryGroup())) {
    String appname = item.getRegistryKey();
    List<String> registryList = appAddressMap.get(appname);
    if (registryList == null) {
     registryList = new ArrayList<String>();
    }

    if (!registryList.contains(item.getRegistryValue())) {
     registryList.add(item.getRegistryValue());
    }
    appAddressMap.put(appname, registryList);
   }
  }
 }

 // fresh group address
 for (XxlJobGroup group: groupList) {
  List<String> registryList = appAddressMap.get(group.getAppname());
  String addressListStr = null;
  if (registryList!=null && !registryList.isEmpty()) {
   Collections.sort(registryList);
   addressListStr = "";
   for (String item:registryList) {
    addressListStr += item + ",";
   }
   addressListStr = addressListStr.substring(0, addressListStr.length()-1);
  }
  group.setAddressList(addressListStr);
  XxlJobAdminConfig.getAdminConfig().getXxlJobGroupDao().update(group);
 }
}

  这里调度中心是通过检查 xxl_job_registry 中是否存在90秒钟没有更新的任务注册信息(即检查90秒内执行器是否发送心跳),如果没有进行更新,则认为该执行器实例已下线。然后重新更新执行器在线实例地址以及任务组的注册地址(该注册地址用于调度中心进行任务触发使用)。

2. 任务失败丢失处理

  在 JobFailMonitorHelper.getInstance().start(); 和 JobLosedMonitorHelper.getInstance().start(); 两个方法中主要是对 「执行失败」 的任务和 「丢失」 的任务(执行器实例在执行过程中被重启或者下线,导致任务执行线程被终止,最终任务丢失)进行相应的处理。
  首先,来看下 JobFailMonitorHelper 。其实,xxl-job 对于执行失败的任务并没有类似 dubbo 提供的多种failover机制,对于失败的任务其实就是根据用户在调度中心设置的任务失败重试次数进行简单的失败重试,如果重试此处达到设置的重试次数则放弃重试。
  而对于由于执行器示例在执行任务过程中被重启或者下线导致的任务丢失长时间没有将任务结果回调调度中心的情况,在 JobLosedMonitorHelper 做出了如下处理:

// 任务结果丢失处理:调度记录停留在 "运行中" 状态超过10min,且对应执行器心跳注册失败不在线,则将本地调度主动标记失败;
Date losedTime = DateUtil.addMinutes(new Date(), -10);
List<Long> losedJobIds  = XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().findLostJobIds(losedTime);

if (losedJobIds!=null && losedJobIds.size()>0) {
 for (Long logId: losedJobIds) {

  XxlJobLog jobLog = new XxlJobLog();
  jobLog.setId(logId);

  jobLog.setHandleTime(new Date());
  jobLog.setHandleCode(ReturnT.FAIL_CODE);
  jobLog.setHandleMsg( I18nUtil.getString("joblog_lost_fail") );

  XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().updateHandleInfo(jobLog);
 }
}

  调度中心会进行调度任务是否停留 「“运行状态”」 超过10分钟以及对应执行器实例是否已经离线的判断。如果符合判断条件,则会主动将任务置为失败,然后按照任务执行失败处理。

3. 任务调度

  作为任务调度中间件最重要的一环,调度中心在 JobScheduleHelper.getInstance().start(); 方法中进行了任务的调度逻辑处理。
  在 JobScheduleHelper.getInstance().start(); 方法中开启了一个任务调度的守护线程,在这个线程中每五秒会执行一次任务调度。针对调度中心使用HA模式时,xxl-job使用了争抢数据库锁的方式来让每个调度中心实例争抢任务调度权(这是为了避免重复触发任务)。具体代码如下:

conn = XxlJobAdminConfig.getAdminConfig().getDataSource().getConnection();
connAutoCommit = conn.getAutoCommit();
conn.setAutoCommit(false);

preparedStatement = conn.prepareStatement(  "select * from xxl_job_lock where lock_name = 'schedule_lock' for update" );
preparedStatement.execute();

  在进行任务调度时,调度中心采取的是预处理方式。即预先读取指定数量的任务进行调度。这里预先读取的时间范围为 「任务下次执行时间下小于当前时间加5秒」 ,数量为 「线程池大小*触发器qps(每个触发器花费50ms,qps = 1000/50 = 20)」 。但是实际计算预读取数量的时候使用的是快触发器线程大小+慢触发器线程大小。具体源码如下:

// 预读取任务数量
int preReadCount = (XxlJobAdminConfig.getAdminConfig().getTriggerPoolFastMax() + XxlJobAdminConfig.getAdminConfig().getTriggerPoolSlowMax()) * 20;

// 预读取任务调度列表,这里PRE_READ_MS=5000ms
List<XxlJobInfo> scheduleList = XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().scheduleJobQuery(nowTime + PRE_READ_MS, preReadCount);

  对于任务调度列表中下次执行时间调度中心又作出了如下处理:

  • 当前时间 > (下次触发时间 + 5s) : 直接丢弃任务调度。
  • 当前时间 > 下次触发时间 : 直接触发任务,并重新计算任务下次触发时间,如果在预读取范围内,则放置到 ringData 内。
  • 当前时间 <= 下次触发时间 : 将任务插入到 ringData等待任务执行。(该逻辑为主要任务调度逻辑分支,以上两种为特殊情况)。

三、总结

  上面就是对 xxl-job 调度中心的三个核心功能的简单分析(细心的同学一定发现 JobTriggerPoolHelper还没有讲,这部分由于较为重要,闲鱼准备单开一章分析 )。除了这三个较为核心的功能,xxl-job 的调度中心还提供了诸如任务失败报警、日志管理、报表大盘等运维功能,这些优秀的运维功能也是闲鱼为什么想要使用 xxl-job 的重要原因,后续闲鱼会挑选其中比较重要的功能进行介绍。
  以上纯属个人浅见,如有谬误,请各位看官大佬多多指正。

本文使用 mdnice 排版

brucebat wechat
一个闲鱼程序猿的微信公众号
  • 本文作者: brucebat
  • 本文链接: https://www.swzgeek.com/archives/分布式调度中间件xxl-job六调度中心--总览
  • 版权声明: 本博客所有文章除特别声明外,均采用CC BY-NC-SA 3.0 许可协议。转载请注明出处!
# NIO # I/O # 计算机科学 # 操作系统 # 设计模式 # 随记 # WebSocket # 计算机网络 # 注册中心 # 经典电影 # xxl-job # 分布式 # 分布式任务调度 # MySQL # DevOps # Docker # 多线程 # 有趣的问题 # Mybatis-Plus # Mybatis # Java # 数据结构
分布式调度中间件xxl-job(五):执行器Executor--任务执行
分布式调度中间件xxl-job(七):调度器Trigger
  • 文章目录
  • 站点概览
brucebat

brucebat

一个有梦想的咸鱼程序猿

46 日志
8 分类
22 标签
RSS
Github E-mail
Creative Commons
© 2020 — 2023 brucebat
苏ICP备20002207号-1

苏公网安备 32011302320859号

0%