为什么要学习分布式调度XXL-Job

XXL-JOB是一个分布式任务调度平台,其核心设计目标是开发迅速、学习简单、轻量级、易扩展

多数互联网公司里面用的技术、周期性业务如小溪推送、支付对账、数据统计等、离不开分布式调度

在多数互联网公司中,分布式调度XXL-Job占有率很高,是这几年大量流行

可以作为公司内部培训技术分享必备知识

有谁在用

众多互联网大厂

https://www.xuxueli.com/

你知道多少种定时任务的实现方式

定时任务

通过时间表达式这一方式来进行任务调度的被称为定时任务

分类

单机定时任务

 单机的容易实现,但应用于集群环境做分布式部署,就会带来重复执行

解决方案就有很多比如加锁、数据库等、但是增加了很多非业务逻辑

分部署调度

把需要处理的计划任务放入到统一的平台,实现集群管理调度与分部署部署的定时任务叫分布式定任务

支持集群部署、高可用、并行调度、分片处理等

常见的定时任务

单机:Java自带的java.util.Timer类配置比较麻烦、时间延后问题

单机:ScheduledExcetorService

是基于线程池来进行设计的定时任务类,在这里每个调度的任务都会分配到线程池里的一个线程去执行改任务,并发任务,互不影响

单机:SpringBoot框架自带

SpringBoot使用注解方式开启定时任务

启动类里面@EnableScheduling开启定时任务,自动扫描

定时任务业务类加注解@Component被容器扫描

定时执行的方法加上注解@Scheduled(fixedRate=2000)定期执行一次

为什么需要任务调度平台

在Java中,传统的定时任务实现方案,比如Timer,Quartz等都或多或少存在一些问题:

        不支持集群、不支持统计、没有管理平台、没有失败报警、没有监控等等 而且在现在分布式的架构中,有一些场景需要分布式任务调度:同一个服务多个实例的任务存在互斥时,需要统一的调度。         任务调度需要支持高可用、监控、故障告警。需要统一管理和追踪各个服务节点任务调度的结果,需要记录保存任务属性信息等。         显然传统的定时任务已经不满足现在的分布式架构,所以需要一个分布式任务调度平台,目前比较主流的是elasticjob和xxl-job。

常见的分布式调度平台快速认知

常见分布式定时任务

Quartz

Quartz关注点在于定时任务而非是数据,并没有一套根据数据化处理而定的流程

虽然可以实现数据库作业的高可用,但是缺少了分布式的并行调度功能相对弱点

不支持任务分片,没有UI界面管理,并行调度,失败策略也缺少

TBSchedule

这个是阿里巴巴早期开源的分布式调度系统,使用的是tomer而不是线程池执行调度任务,使用timer在处理异常的时候是有缺陷的但TBSchedule的作业类型比较单一,文档缺失得也比较严重

目前阿里内部使用的是ScheduleX

Elastic-job

当当开发的分布式任务调度系统,功能强大,采用的是zookeeper实现分布式协调,具有高可用分片。

2020年6月,ElasticJob的四个子项目已经正式迁入Apache仓库

由2个相互独立的子项目ElasticJob-Lite和Elastic-Cloud组成

ElasticJoc-Lite定位为轻量化无中心化解决方案,使用jar的形式提供分布式任务的协调服务;

Elastic-Cloud使用Mesos的解决方案,额外提供资源治理,应用分发以及进程隔离等服务

XXL-JOB

大众点评的员工徐雪里在15年发布的分布式调度平台是轻量级的分布式任务调度框架,目标是开发迅速,简单,清理,易扩展;老版本是依赖quartz的定时任务,在V2.1.0版本开始,移除quartz依赖

常用对比图

 如何选择哪一个分布式任务调度平台

XXL-Job和Elastic-Job都具有广泛的用户基础和完善的技术文档,都可以满足定时任务的基本功能需求

XXL-Job侧重在业务实现简单和管理方便,容易学习,失败与路由策略丰富推荐使用在用户基数相对较少,服务器的数量在一定的范围内的场景下使用

Elastic-job关注的点在数据,添加了弹性扩容和数据分片的思路,更方便利用分布式服务器的资源,但是学习难度较大,推荐数据量庞大,服务器数量多的时候用

带你走进分布式调度XXL-Job的特性

什么是XXL-Job

XXL-JOB

大众点评的员工徐雪里在15年发布的分布式任务调度平台,是轻量级的分布式任务调度框架,目标是开发迅速,简单,清理,易扩展,老版本是依赖quartz的定时任务触发,在V2.1.0版本开始移除quartz依赖

官网地址:https://www.xuxueli.com/xxl-job

GitHub地址:https://github.com/xuxueli/xxl-job/

xxl-job的设计思想

将调度行为抽象形成"调度中心"公共平台,而平台自身并不承担业务逻辑,"调度中心"负责发起调度请求

将任务抽象成分散的obHandler,交由"执行器"统一管理‘

"执行器"负责接收调度请求并执行对应的JobHandler中业务逻辑

因此,”调度“和”任务“两部分可以相互解耦,提高系统整体稳定性和扩展性

架构图

调度中心

负责管理调度的信息,按照调度的配置来发出调度请求

支持可视化、简单的动态管理调度信息,包括新建,删除,更新等,这些操作都会实时生效,同时也支持监控调度结果以及执行日志

执行器

负责接受请求并且执行任务的逻辑,任务模块专注于任务的执行操作等,使得开发和维护更加的简单和高效。

 xxl具有那些特性

调度中心HA:调度采用了中心式 进行设计,"调度中心"支持集群部署,可保证调度中心HA

执行器HA:任务分布式的执行,任务执行器支持集群部署,可保证执行HA

触发策略,有cron触发,固定间隔触发,固定延时触发,API事件触发,人工触发,父子类任务触发

路由策略:执行器在集群部署的时候提供了丰富的路由策略:如:第一个、最后一个、轮询、随机、一致性、HASH、最不经常使用LFU、最久未使用LRU、故障转移等

故障转移:如果执行器集群的一台机器发生故障、会自动切换到一台正常的执行器发送任务调度

Rolling实时日志的监控支持rolling方式查看输出的完整的执行日志

脚本任务:支持GLUE模式开发和运行脚本任务,包括Shell、python、node.js、php等脚本

XXL源码部署和数据库建立

下载地址:https://github.com/xuxueli/xxl-job/releases

安装步骤1:在MYSQL中导入必要的数据库和表

位置:xxl-job-2.3.1.tar.gz\xxl-job-2.3.1\doc\db

确保数据库生成8张表:

修改admin的配置:

位置:xxl-job-2.3.1.tar.gz\xxl-job-2.3.1\xxl-job-admin\src\main\resources\application.properties

主要修改位置:

 修改执行器配置

位置:xxl-job-2.3.1.tar.gz\xxl-job-2.3.1\xxl-job-executor-samples\xxl-job-executor-sample-springboot\src\main\resources\application.properties

主要修改:修改端口号

配置 xxl.job.accessToken(后续要配置客户端接入配置token)

xxl.job.accessToken=xdclass.net

 在解压的文件下执行:mvn package

 会在xxl-job-admin下生成文件夹targe里面有可执行的jar包

 还有xxl-job-2.3.1\xxl-job-executor-samples\xxl-job-executor-sample-springboot\target下生成的jar包

 启动

执行: nohup java -jar 可执行jar包

2.命令:java -jar xxx.jar &   此语句比第一个命令多一个&符号,但是Ctrl+c或者关闭窗口后 后台程序仍然继续执行。

3,执行java -jar xxx.jar后,然后ctrl+z 退出到控制台,执行 bg 再执行exit命令。完成以上3步,退出shell后,jar服务仍然在后台运行

4,命令nohup java -jar xxxx.jar &   和第二种方式相似 只不过在前面加上nohup  此种方式比较推荐。

nohup java -jar xxl-job-admin-2.3.1.jar &

nohub java -jar xxl-job-admin-2.3.1.jar &

打开

http://服务器地址:admin端口号/xxl-job-admin即可打开

默认账号密码:admin / 123456

查看端口号是否被占用

netstat -tln

netstat -tln | grep 9090

目录结构

doc:xxl-job的文档资料,包括了数据库的脚本(后面要用到)xxl-job-core:公共jar包依赖xxl-job-admin:调度中心,项目源码,是Springboot项目,可以直接启动xxl-job-executor-samples:执行器,是Sample实例项目,里面的Springboot工程可以直接启动,也可以在该项目的基础上进行开发,也可以将现有的项目改造成为执行器项目

三 表介绍 xxl_job的数据库里有如下几个表:

xxl_job_group:执行器信息表,用于维护任务执行器的信息xxl_job_info:调度扩展信息表,主要是用于保存xxl-job的调度任务的扩展信息,比如说像任务分组、任务名、机器的地址等等xxl_job_lock:任务调度锁表xxl_job_log:日志表,主要是用在保存xxl-job任务调度历史信息,像调度结果、执行结果、调度入参等等xxl_job_log_report:日志报表,会存储xxl-job任务调度的日志报表,会在调度中心里的报表功能里使用到xxl_job_logglue:任务的GLUE日志,用于保存GLUE日志的更新历史变化,支持GLUE版本的回溯功能xxl_job_registry:执行器的注册表,用在维护在线的执行器与调度中心的地址信息xxl_job_user:系统的用户表 分布式调度XXL-Job UI菜单模块介绍

运行报表

以图形化来展示了整体的任务执行情况

任务数量:能够看到调度中心运行的任务数量调度次数:调度中心所触发的调度次数执行器数量:在整个调度中心中,在线的执行器数量有多少

任务管理(配置执行任务)

示例执行器:所用到的执行器任务描述:概述该任务是做什么的路由策略:

第一个:选择第一个机器最后一个:选择最后一个机器轮询:依次选择执行随机:随机选择在线的机器一致性HASH:每个任务按照Hash算法固定选择某一台机器,并且所有的任务均匀散列在不同的机器上最不经常使用:使用频率最低的机器优先被使用最近最久未使用:最久未使用的机器优先被选举故障转移:按照顺序依次进行心跳检测,第一个心跳检测成功的机器选定为目标的执行器并且会发起任务调度忙碌转移:按照顺序来依次进行空闲检测,第一个空闲检测成功的机器会被选定为目标群机器,并且会发起任务调度分片广播:广播触发对于集群中的所有机器执行任务,同时会系统会自动传递分片的参数

Cron:执行规则调度过期策略:调度中心错过调度时间的补偿处理策略,包括:忽略、立即补偿触发一次等JobHandler:定义执行器的名字阻塞处理策略:

单机串行:新的调度任务在进入到执行器之后,该调度任务进入FIFO队列,并以串行的方式去进行丢弃后续调度:新的调度任务在进入到执行器之后,如果存在相同的且正在运行的调度任务,本次的调度任务请求就会被丢弃掉,并且标记为失败覆盖之前的调度:新的调度任务在进入到执行器之后,如果存在相同的且正在运行的调度任务,就会终止掉当前正在运行的调度任务,并且清空队列,运行新的调度任务。

子任务ID:输入子任务的任务id,可填写多个任务超时时间:添加任务超时的时候,单位s,设置时间大于0的时候就会生效失败重试次数:设置失败重试的次数,设置时间大于0的时候就会生效负责人:填写该任务调度的负责人报警邮件:出现报警,则发送邮件

 调度日志

这里是查看调度的日志,根据日志来查看任务具体的执行情况是怎样的

执行器管理

这里是配置执行器,等待执行器启动的时候都会被调度中心监听加入到地址列表

用户管理

可以对用户的一些操作  

SpringBoot整合XXL-Job

添加XXL-Job依赖

   com.xuxueli

   xxl-job-core

   2.3.0

 配置文件

#----------xxl-job配置--------------

#调度中心部署地址,多个配置逗号分隔 "http://address01,http://address02"

xxl.job.admin.addresses=http://39.97.100.141:7000/xxl-job-admin

#执行器token,非空时启用 xxl-job, access token

xxl.job.accessToken=default_token

# 执行器app名称,和控制台那边配置一样的名称,不然注册不上去

xxl.job.executor.appname=xxl-job-executor-sample

# [选填]执行器注册:优先使用该配置作为注册地址,为空时使用内嵌服务 ”IP:PORT“ 作为注册地址。

#从而更灵活的支持容器类型执行器动态IP和动态映射端口问题。

xxl.job.executor.address=

#[选填]执行器IP :默认为空表示自动获取IP(即springboot容器的ip和端口,可以自动获取,也可以指定),多网卡时可手动设置指定IP,该IP不会绑定Host仅作为通讯实用;地址信息用于 "执行器注册" 和 "调度中心请求并触发任务",

xxl.job.executor.ip=

# [选填]执行器端口号:小于等于0则自动获取;默认端口为9999,单机部署多个执行器时,注意要配置不同执行器端口;

xxl.job.executor.port=9999

#执行器日志文件存储路径,需要对该路径拥有读写权限;为空则使用默认路径

xxl.job.executor.logpath=/data/applogs/xxl-job/jobhandler

#执行器日志保存天数

xxl.job.executor.logretentiondays=30

XxlJobConfig

import com.xxl.job.core.executor.impl.XxlJobSpringExecutor;

import org.slf4j.Logger;

import org.slf4j.LoggerFactory;

import org.springframework.beans.factory.annotation.Value;

import org.springframework.context.annotation.Bean;

import org.springframework.context.annotation.Configuration;

 

/**

 * xxl-job config

 *

 * @author xuxueli 2017-04-28

 */

@Configuration

public class XxlJobConfig {

    private final Logger logger = LoggerFactory.getLogger(XxlJobConfig.class);

 

    @Value("${xxl.job.admin.addresses:}")

    private String adminAddresses;

 

    @Value("${xxl.job.accessToken:}")

    private String accessToken;

 

    @Value("${xxl.job.executor.appname}")

    private String appname;

 

    @Value("${xxl.job.executor.address}")

    private String address;

 

    @Value("${xxl.job.executor.ip}")

    private String ip;

 

    @Value("${xxl.job.executor.port}")

    private int port;

 

    @Value("${xxl.job.executor.logpath}")

    private String logPath;

 

    @Value("${xxl.job.executor.logretentiondays}")

    private int logRetentionDays;

 

 

    @Bean

    public XxlJobSpringExecutor xxlJobExecutor() {

        logger.info(">>>>>>>>>>> xxl-job config init.");

        XxlJobSpringExecutor xxlJobSpringExecutor = new XxlJobSpringExecutor();

        xxlJobSpringExecutor.setAdminAddresses(adminAddresses);

        xxlJobSpringExecutor.setAppname(appname);

        xxlJobSpringExecutor.setAddress(address);

        xxlJobSpringExecutor.setIp(ip);

        xxlJobSpringExecutor.setPort(port);

        xxlJobSpringExecutor.setAccessToken(accessToken);

        xxlJobSpringExecutor.setLogPath(logPath);

        xxlJobSpringExecutor.setLogRetentionDays(logRetentionDays);

 

        return xxlJobSpringExecutor;

    }

    /**

     * 针对多网卡、容器内部署等情况,可借助 "spring-cloud-commons" 提供的 "InetUtils" 组件灵活定制注册IP;

     *

     *      1、引入依赖:

     *          

     *             org.springframework.cloud

     *             spring-cloud-commons

     *             ${version}

     *        

     *

     *      2、配置文件,或者容器启动变量

     *          spring.cloud.inetutils.preferred-networks: 'xxx.xxx.xxx.'

     *

     *      3、获取IP

     *          String ip_ = inetUtils.findFirstNonLoopbackHostInfo().getIpAddress();

     */

 

 

}

每日任务

/**

 * 每日任务

 * 每日凌晨1点执行

 *

 * @author Chopper

 */

public interface EveryDayExecute {

 

    /**

     * 执行每日任务

     */

    void execute();

 

 

}

每刻任务

/**

 * 每分钟任务

 *

 * @author Chopper

 */

public interface EveryFifteenMinuteExecute {

 

    /**

     * 执行

     */

    void execute();

 

 

}

  每小时任务

/**

 * 每小时任务

 *

 * @author Chopper

 */

public interface EveryHourExecute {

 

    /**

     * 执行

     */

    void execute();

 

 

}

每分钟任务

/**

 * 每分钟任务

 *

 * @author Chopper

 */

public interface EveryMinuteExecute {

 

    /**

     * 执行

     */

    void execute();

 

 

}

定时器任务

/**

 * 定时器任务

 *

 * @author Chopper

 * @version v1.0

 

 */

@Slf4j

@Component

public class TimedTaskJobHandler {

 

    @Autowired(required = false)

    private List everyMinuteExecutes;

 

 

    @Autowired(required = false)

    private List everyHourExecutes;

 

 

    @Autowired(required = false)

    private List everyDayExecutes;

 

    @Autowired(required = false)

    private List everyFifteenMinuteExecute;

 

    /**

     * 每分钟任务

     *

     * @throws Exception

     */

    @XxlJob("everyMinuteExecute")

    public ReturnT everyMinuteExecute(String param)  {

        log.info("每分钟任务执行");

        if (everyMinuteExecutes == null || everyMinuteExecutes.size() == 0) {

            return ReturnT.SUCCESS;

        }

 

        for (int i = 0; i < everyMinuteExecutes.size(); i++) {

            try {

                everyMinuteExecutes.get(i).execute();

            } catch (Exception e) {

                log.error("每分钟任务异常", e);

            }

        }

        return ReturnT.SUCCESS;

    }

 

    /**

     * 每15分钟任务

     *

     * @throws Exception

     */

    @XxlJob("everyFifteenMinuteExecute")

    public ReturnT everyFifteenMinuteExecute(String param)  {

        log.info("每15分钟任务执行");

        if (everyFifteenMinuteExecute == null || everyFifteenMinuteExecute.size() == 0) {

            return ReturnT.SUCCESS;

        }

 

        for (int i = 0; i < everyFifteenMinuteExecute.size(); i++) {

            try {

                everyFifteenMinuteExecute.get(i).execute();

            } catch (Exception e) {

                log.error("每15分钟任务异常", e);

            }

        }

        return ReturnT.SUCCESS;

    }

 

    /**

     * 每小时任务

     *

     * @throws Exception

     */

    @XxlJob("everyHourExecuteJobHandler")

    public ReturnT everyHourExecuteJobHandler(String param) {

        log.info("每小时任务执行");

        if (everyHourExecutes == null || everyHourExecutes.size() == 0) {

            return ReturnT.SUCCESS;

        }

 

        for (int i = 0; i < everyHourExecutes.size(); i++) {

            try {

                everyHourExecutes.get(i).execute();

            } catch (Exception e) {

                log.error("每小时任务异常", e);

            }

        }

        return ReturnT.SUCCESS;

    }

 

    /**

     * 每日任务

     *

     * @throws Exception

     */

    @XxlJob("everyDayExecuteJobHandler")

    public ReturnT everyDayExecuteJobHandler(String param) {

 

        log.info("每日任务执行");

        if (everyDayExecutes == null || everyDayExecutes.size() == 0) {

            return ReturnT.SUCCESS;

        }

 

        for (int i = 0; i < everyDayExecutes.size(); i++) {

            try {

                everyDayExecutes.get(i).execute();

            } catch (Exception e) {

                log.error("每分钟任务异常", e);

            }

        }

        return ReturnT.SUCCESS;

    }

 

 

}

 示例

import cn.lili.modules.search.service.EsGoodsIndexService;

import cn.lili.timetask.handler.EveryFifteenMinuteExecute;

import lombok.extern.slf4j.Slf4j;

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.stereotype.Component;

 

@Slf4j

@Component

public class IndexOrderTaskExcecute implements EveryFifteenMinuteExecute {

 

    @Autowired

    private EsGoodsIndexService esGoodsIndexService;

 

    @Override

    public void execute() {

        log.info("====IndexOrderTaskExcecute=====,每隔15分钟生成一次索引");

        esGoodsIndexService.init();

    }

}

相关阅读

评论可见,请评论后查看内容,谢谢!!!
 您阅读本篇文章共花了: