BruceBat's Blog
BruceBat's Blog

竹杖芒鞋轻胜马,谁怕?一蓑烟雨任平生


  • 首页

  • 归档

  • 分类

  • 标签

  • 搜索
NIO I/O 计算机科学 操作系统 设计模式 随记 WebSocket 计算机网络 注册中心 经典电影 xxl-job 分布式 分布式任务调度 MySQL DevOps Docker 多线程 有趣的问题 Mybatis-Plus Mybatis Java 数据结构

分布式调度中间件xxl-job(七):调度器Trigger

发表于 2020-07-06 | 分类于 Java | 0 | 阅读次数 1430
❝

人生苦短,不如养狗

❞

  上一篇中我们了解了调度中心除了调度器以外的基本功能,今天我们就来学习一下 xxl-job 中 「调度器(Trigger)」 是如何工作的。

一、简介

  在前面的学习我们可以看到,xxl-job 将 「时间调度」 和 「具体任务调度」 两个逻辑进行了拆分。在 JobScheduleHelper 中进行时间调度逻辑处理,将具体任务调度逻辑放置在 JobTriggerPoolHelper 中。所以这章我们主要学习 JobTriggerPoolHelper 中是如何进行具体任务调度的。
  先总体来看下 JobTriggerPoolHelper 提供的方法:

  从上面的方法中可以总结出 JobTriggerPoolHelper 大致提供了一下三个功能:

  • 启动处理线程
  • 终止处理线程
  • 进行任务触发

  下面我们就来看看 JobTriggerPoolHelper 具体的处理逻辑。

二、任务触发器 JobTriggerPoolHelper

1. 触发方式

  通过查看 JobTriggerPoolHelper 中 trigger() 方法的使用者,我们可以看到有一下五种触发任务的场景:

  • 在调度中心页面中触发一次任务;
  • 由调度中心根据时间调度进行任务触发;
  • 任务失败重新进行任务触发;
  • 父任务完成进行子任务触发;
  • 通过API调用进行任务触发;

  当然,我们也可以通过触发类型枚举类 TriggerTypeEnum 来查看:

/**
 * 人工触发
 */
MANUAL(I18nUtil.getString("jobconf_trigger_type_manual")),
/**
 * 根据cron表达式触发
 */
CRON(I18nUtil.getString("jobconf_trigger_type_cron")),
/**
 * 失败重试触发
 */
RETRY(I18nUtil.getString("jobconf_trigger_type_retry")),
/**
 * 父任务触发
 */
PARENT(I18nUtil.getString("jobconf_trigger_type_parent")),
/**
 * API调用触发
 */
API(I18nUtil.getString("jobconf_trigger_type_api"));

2. 快任务线程池和慢任务线程池

  由于在 xxl-job 使用了线程池来进行任务调度,一旦出现某个任务调度时间过长致使线程阻塞就会导致调度中心调度效率的下降。为了解决这一问题,xxl-job 创建了 「快任务线程池」 和 「慢任务线程池」 。
  一般情况,任务默认放置在快任务线程池中进行任务触发。这里 xxl-job 设置了一个任务触发时间窗口,长度为500ms。触发器在任务触发过程中每分钟检查当前任务已触发时间,如果超过时间窗口的次数超过10次,则会将该任务降级到慢任务线程池中。具体处理逻辑如下:

// 线程池选择逻辑
ThreadPoolExecutor triggerPool_ = fastTriggerPool;
AtomicInteger jobTimeoutCount = jobTimeoutCountMap.get(jobId);
if (jobTimeoutCount!=null && jobTimeoutCount.get() > 10) {
    // job-timeout 10 times in 1 min
    triggerPool_ = slowTriggerPool;
}

...
// 时间窗口检查逻辑
// check timeout-count-map
long minTim_now = System.currentTimeMillis()/60000;
if (minTim != minTim_now) {
    minTim = minTim_now;
    jobTimeoutCountMap.clear();
}

// incr timeout-count-map
long cost = System.currentTimeMillis()-start;
if (cost > 500) {
    // ob-timeout threshold 500ms
    AtomicInteger timeoutCount = jobTimeoutCountMap.putIfAbsent(jobId, new AtomicInteger(1));
    if (timeoutCount != null) {
        timeoutCount.incrementAndGet();
    }
}

3. 任务触发

  任务触发是通过快任务线程池/慢任务线程池调用 XxlJobTrigger 的 trigger() 方法实现。具体代码如下:

public static void trigger(int jobId,
                           TriggerTypeEnum triggerType,
                           int failRetryCount,
                           String executorShardingParam,
                           String executorParam,
                           String addressList) {

    // load data
    XxlJobInfo jobInfo = XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().loadById(jobId);
    if (jobInfo == null) {
        logger.warn(">>>>>>>>>>>> trigger fail, jobId invalid,jobId={}", jobId);
        return;
    }
    if (executorParam != null) {
        jobInfo.setExecutorParam(executorParam);
    }
    int finalFailRetryCount = failRetryCount>=0?failRetryCount:jobInfo.getExecutorFailRetryCount();
    XxlJobGroup group = XxlJobAdminConfig.getAdminConfig().getXxlJobGroupDao().load(jobInfo.getJobGroup());

    // cover addressList
    if (addressList!=null && addressList.trim().length()>0) {
        group.setAddressType(1);
        group.setAddressList(addressList.trim());
    }

    // sharding param
    int[] shardingParam = null;
    if (executorShardingParam!=null){
        String[] shardingArr = executorShardingParam.split("/");
        if (shardingArr.length==2 && isNumeric(shardingArr[0]) && isNumeric(shardingArr[1])) {
            shardingParam = new int[2];
            shardingParam[0] = Integer.valueOf(shardingArr[0]);
            shardingParam[1] = Integer.valueOf(shardingArr[1]);
        }
    }
    if (ExecutorRouteStrategyEnum.SHARDING_BROADCAST==ExecutorRouteStrategyEnum.match(jobInfo.getExecutorRouteStrategy(), null)
            && group.getRegistryList()!=null && !group.getRegistryList().isEmpty()
            && shardingParam==null) {
        for (int i = 0; i < group.getRegistryList().size(); i++) {
            processTrigger(group, jobInfo, finalFailRetryCount, triggerType, i, group.getRegistryList().size());
        }
    } else {
        if (shardingParam == null) {
            shardingParam = new int[]{0, 1};
        }
        processTrigger(group, jobInfo, finalFailRetryCount, triggerType, shardingParam[0], shardingParam[1]);
    }

}

  在 trigger() 方法中进行了如下处理:

  • 根据 jobId 加载任务信息,并在任务信息中加载执行参数;
  • 获取任务重试次数;
  • 重新检查执行器列表并更新;
  • 获取分片执行信息 executorShardingParam ,将其转化为分片参数数组,数据大小为2,第一个元素存放的是 「当前执行的任务分片index」 ,第二个元素存放的是 「任务分片的总数」 ;
  • 根据任务执行路由策略判断是否进行分片执行,如果进行分片执行,则按照当前执行器组的情况进行分片执行, 「分片总数等于执行器组中在线的机器数」 。如果不进行分片执行,则设置 「分片index为0」 ,分片 「总数为1」 ,然后进行任务执行。

三、总结

  至此,xxl-job 的主要处理逻辑已经全部学习完毕(终于水完了,完结撒花~~ )。此时回头看一看原先的文章,里面仍有不少地方分析的不够细致,但是作为基本认知入门应该还是可以的,吧?哈哈哈~~
  疫情虽然稳定,但形式仍旧严峻,希望大家还是要时刻警惕。最后祝各位身体健康,工作顺心,加油~~

本文使用 mdnice 排版

brucebat wechat
一个闲鱼程序猿的微信公众号
  • 本文作者: brucebat
  • 本文链接: https://www.swzgeek.com/archives/分布式调度中间件xxl-job七调度器trigger
  • 版权声明: 本博客所有文章除特别声明外,均采用CC BY-NC-SA 3.0 许可协议。转载请注明出处!
# NIO # I/O # 计算机科学 # 操作系统 # 设计模式 # 随记 # WebSocket # 计算机网络 # 注册中心 # 经典电影 # xxl-job # 分布式 # 分布式任务调度 # MySQL # DevOps # Docker # 多线程 # 有趣的问题 # Mybatis-Plus # Mybatis # Java # 数据结构
分布式调度中间件xxl-job(六):调度中心--总览
有趣的MySQL(二):“order by”引发的乱序
  • 文章目录
  • 站点概览
brucebat

brucebat

一个有梦想的咸鱼程序猿

46 日志
8 分类
22 标签
RSS
Github E-mail
Creative Commons
© 2020 — 2023 brucebat
苏ICP备20002207号-1

苏公网安备 32011302320859号

0%