介绍
XXL-JOB任务调度平台分为2个部分,Scheduler和Executor。具体的实现Scheduler对应是xxl-job-admin,同时xxl-job-admin还配有web UI,可以配置管理任务。
Scheduler和Executor之间通过HTTP API交互,因此Executor可以通过各种语言实现。
图片
以上图为例,scheduleThread将任务通过Executor的/run api推送给Executor
{
"jobId": 3,
"executorHandler": "task.test",
"executorParams": "x=100",
"executorBlockStrategy": "SERIAL_EXECUTION",
"executorTimeout": 0,
"logId": 17,
"logDateTime": 1606100913829,
"glueType": "BEAN",
"glueSource": "",
"glueUpdatetime": 1606099566000,
"broadcastIndex": 0,
"broadcastTotal": 1
}
Executor会根据executorHandler找到对应的handler,执行完之后,又会调用xxl-job-admin的/xxl-job-admin/api/callback回报任务的执行结果。从上面的描述我们可以知道,xxl-job-admin和excutor都必须暴露出api服务(都是HTTP接口)。
Scheduler可以有多个。它们之间通过MySQL进行同步。
主要的调度逻辑在JobScheduleHelper中
在每一轮执行调度逻辑之前, Scheduler必须先获得行锁
while (!scheduleThreadToStop) {
...
// 加行锁
try {
preparedStatement = conn.prepareStatement( "select * from xxl_job_lock where lock_name = 'schedule_lock' for update" );
preparedStatement.execute();
...
} catch (Exception e) { ... } finally {
...
// 注意:锁必须正常释放
conn.commit();
...
}
由于xxl_job_lock 表中只有一条记录,所以这个逻辑与请求表锁类似,开销是比较大的。
其实这里还可以利用分治法的思想,让不同的任务对应到不同的行锁。来提高整体的并发度。依我推测, xxl-job 设计时考虑就是调度任务的数量不会太多。因此性能不是它的最主要关注点。
xxl-job内部没有使用Zookeeper这种数据库,因此在高可用性上与Quartz相比还是会稍微弱一些。好在它依赖少,搭建、学习的成本就会非常低。
对MySQL而言,如果xxl-job-admin在持有行锁的期间发生异常退出,与MySQL的连接断开。一段时间之后,MySQL会自动主动释放这个行锁。因此并不会出现死锁的问题。
接入指南
xxl-job-go-sdk
xxl-job go 客户端
支持
1.执行器注册
2.耗时任务取消
3.任务注册,像写http.Handler一样方便
4.任务panic处理
5.阻塞策略处理
6.任务完成支持返回执行备注
7.任务超时取消 (单位:秒,0为不限制)
8.失败重试次数(在参数param中,目前由任务自行处理)
9.可自定义日志
10.自定义日志查看handler
11.支持外部路由(可与gin集成)
How to get
go get github.com/mousycoder/xxl-job-go-sdk
Example
package main
import (
"fmt"
xxl “github.com/mousycoder/xxl-job-go-sdk"
“github.com/xxl-job-go-sdk/example/task"
"log"
)
func main() {
exec := xxl.NewExecutor(
xxl.ServerAddr("http://127.0.0.1/xxl-job-admin"),
xxl.AccessToken(""), //请求令牌(默认为空)
xxl.ExecutorIp("127.0.0.1"), //可自动获取
xxl.ExecutorPort("9999"), //默认9999(非必填)
xxl.RegistryKey("golang-jobs"), //执行器名称
xxl.SetLogger(&logger{}), //自定义日志
)
exec.Init()
//设置日志查看handler
exec.LogHandler(func(req *xxl.LogReq) *xxl.LogRes {
return &xxl.LogRes{Code: 200, Msg: "", Content: xxl.LogResContent{
FromLineNum: req.FromLineNum,
ToLineNum: 2,
LogContent: "这个是自定义日志handler",
IsEnd: true,
}}
})
//注册任务handler
exec.RegTask("task.test", task.Test)
exec.RegTask("task.test2", task.Test2)
exec.RegTask("task.panic", task.Panic)
log.Fatal(exec.Run())
}
//xxl.Logger接口实现
type logger struct{}
func (l *logger) Info(format string, a ...interface{}) {
fmt.Println(fmt.Sprintf("自定义日志 - "+format, a...))
}
func (l *logger) Error(format string, a ...interface{}) {
log.Println(fmt.Sprintf("自定义日志 - "+format, a...))
}
xxl-job-admin配置
添加执行器
执行器管理->新增执行器,执行器列表如下:
AppName 名称 注册方式 OnLine 机器地址 操作
golang-jobs golang执行器 自动注册 查看 ( 1 )
查看->注册节点
添加任务
任务管理->新增(注意,使用BEAN模式,JobHandler与RegTask名称一致)
1 测试panic BEAN:task.panic * 0 * * * ? admin STOP
2 测试耗时任务 BEAN:task.test2 * * * * * ? admin STOP
3 测试golang BEAN:task.test * * * * * ? admin STOP
SDK 源码地址
https://github.com/mousycoder/xxl-job-go-sdk
SDK 源码解析
1.初始化执行器信息(令牌,执行器名称,IP,端口,调度中心地址)
exec := xxl.NewExecutor(
xxl.ServerAddr("http://xxl-job.test.com:18088/xxl-job-admin"),
xxl.AccessToken(“xxxxx"), //请求令牌(默认为空)
//xxl.ExecutorIp("127.0.0.1"), //可自动获取
xxl.ExecutorPort("9999"), //默认9999(非必填)
xxl.RegistryKey("golang-jobs"), //执行器名称
xxl.SetLogger(&logger{}), //自定义日志
)
2.调用执行器注册接口(/api/registry)注册执行器到调度中心(20秒心跳防止过期)
func (e *executor) registry() {
t := time.NewTimer(time.Second * 0) //初始立即执行
defer t.Stop()
req := &Registry{
RegistryGroup: "EXECUTOR",
RegistryKey: e.opts.RegistryKey,
RegistryValue: "http://" + e.address,
}
param, err := json.Marshal(req)
if err != nil {
log.Fatal("执行器注册信息解析失败:" + err.Error())
}
for {
<-t.C
t.Reset(time.Second * time.Duration(20)) //20秒心跳防止过期
func() {
result, err := e.post("/api/registry", string(param))
if err != nil {
e.log.Error("执行器注册失败1:" + err.Error())
return
}
defer result.Body.Close()
body, err := ioutil.ReadAll(result.Body)
if err != nil {
e.log.Error("执行器注册失败2:" + err.Error())
return
}
res := &res{}
_ = json.Unmarshal(body, &res)
if res.Code != 200 {
e.log.Error("执行器注册失败3:" + string(body))
return
}
e.log.Info("执行器注册成功:" + string(body))
}()
}
}
3.设置日志查看 handler,用于任务调度中心远程查看执行器日志
exec.LogHandler(func(req *xxl.LogReq) *xxl.LogRes {
return &xxl.LogRes{Code: 200, Msg: "", Content: xxl.LogResContent{
FromLineNum: req.FromLineNum,
ToLineNum: 2,
LogContent: "这个是自定义日志handler",
IsEnd: true,
}}
})
4.注册任务 handler
exec.RegTask("task.test", task.Test)
exec.RegTask("task.test2", task.Test2)
exec.RegTask("task.panic", task.Panic)
加锁,放到内存中
func (t *taskList) Set(key string, val *Task) {
t.mu.Lock()
t.data[key] = val
t.mu.Unlock()
}
5.创建路由规则
// 创建路由器
mux := http.NewServeMux()
// 设置路由规则
mux.HandleFunc("/run", e.runTask)
mux.HandleFunc("/kill", e.killTask)
mux.HandleFunc("/log", e.taskLog)
runTask
流程:
1.解析调用中心发送的请求参数
2.加入到 runlist
3.根据executorHandler的key值找到对应的任务
4.运行任务
5.回调调度中心
6.从 runlist 中删除
7.返回给调度中心 OK
killTask
流程:
1.解析调用中心发送的请求参数
2.判断是否在runlist里
3.中止任务
4.从 runlist 中删除任务
5.返回给调度中心 OK
taskLog
流程:
1.解析调用中心发送的请求参数
2.找到对应任务的 logHandler
3.返回给调度中心 log 信息
6.创建服务器,监听端口
server := &http.Server{
Addr: e.address,
WriteTimeout: time.Second * 3,
Handler: mux,
}
// 监听端口并提供服务
e.log.Info("Starting server at " + e.address)
go server.ListenAndServe()
7.如果服务器意外退出,则移除注册信息
t := time.NewTimer(time.Second * 0) //初始立即执行
defer t.Stop()
req := &Registry{
RegistryGroup: "EXECUTOR",
RegistryKey: e.opts.RegistryKey,
RegistryValue: "http://" + e.address,
}
param, err := json.Marshal(req)
if err != nil {
e.log.Error("执行器摘除失败:" + err.Error())
}
res, err := e.post("/api/registryRemove", string(param))
if err != nil {
e.log.Error("执行器摘除失败:" + err.Error())
}
body, err := ioutil.ReadAll(res.Body)
e.log.Info("执行器摘除成功:" + string(body))
_ = res.Body.Close()
调度中心/执行器 RESTful API
XXL-JOB 目标是一种跨平台、跨语言的任务调度规范和协议。
针对Java应用,可以直接通过官方提供的调度中心与执行器,方便快速的接入和使用调度中心。
针对非Java应用,可借助 XXL-JOB 的标准 RESTful API 方便的实现多语言支持。
调度中心 RESTful API:
说明:调度中心提供给执行器使用的API;不局限于官方执行器使用,第三方可使用该API来实现执行器;
API列表:执行器注册、任务结果回调等;
执行器 RESTful API :
说明:执行器提供给调度中心使用的API;官方执行器默认已实现,第三方执行器需要实现并对接提供给调度中心;
API列表:任务触发、任务终止、任务日志查询……等;
此处 RESTful API 主要用于非Java语言定制个性化执行器使用,实现跨语言。除此之外,如果有需要通过API操作调度中心,可以个性化扩展 “调度中心 RESTful API” 并使用。
调度中心 RESTful API
a、任务回调
说明:执行器执行完任务后,回调任务结果时使用
地址格式:{调度中心跟地址}/callback
Header:
XXL-JOB-ACCESS-TOKEN : {请求令牌}
请求数据格式如下,放置在 RequestBody 中,JSON格式:
[{
"logId":1, // 本次调度日志ID
"logDateTim":0, // 本次调度日志时间
"executeResult":{
"code": 200, // 200 表示任务执行正常,500表示失败
"msg": null
}
}]
响应数据格式:
{
"code": 200, // 200 表示正常、其他失败
"msg": null // 错误提示消息
}
b、执行器注册
说明:执行器注册时使用,调度中心会实时感知注册成功的执行器并发起任务调度
地址格式:{调度中心跟地址}/registry
Header:
XXL-JOB-ACCESS-TOKEN : {请求令牌}
请求数据格式如下,放置在 RequestBody 中,JSON格式:
{
"registryGroup":"EXECUTOR", // 固定值
"registryKey":"xxl-job-executor-example", // 执行器AppName
"registryValue":"http://127.0.0.1:9999/" // 执行器地址,内置服务地址
}
响应数据格式:
{
"code": 200, // 200 表示正常、其他失败
"msg": null // 错误提示消息
}
c、执行器注册摘除
说明:执行器注册摘除时使用,注册摘除后的执行器不参与任务调度与执行
地址格式:{调度中心跟地址}/registryRemove
Header:
XXL-JOB-ACCESS-TOKEN : {请求令牌}
请求数据格式如下,放置在 RequestBody 中,JSON格式:
{
"registryGroup":"EXECUTOR", // 固定值
"registryKey":"xxl-job-executor-example", // 执行器AppName
"registryValue":"http://127.0.0.1:9999/" // 执行器地址,内置服务跟地址
}
响应数据格式:
{
"code": 200, // 200 表示正常、其他失败
"msg": null // 错误提示消息
}
执行器 RESTful API
a、心跳检测
说明:调度中心检测执行器是否在线时使用
地址格式:{执行器内嵌服务跟地址}/beat
Header:
XXL-JOB-ACCESS-TOKEN : {请求令牌}
请求数据格式如下,放置在 RequestBody 中,JSON格式:
响应数据格式:
{
"code": 200, // 200 表示正常、其他失败
"msg": null // 错误提示消息
}
b、忙碌检测
说明:调度中心检测指定执行器上指定任务是否忙碌(运行中)时使用
地址格式:{执行器内嵌服务跟地址}/idleBeat
Header:
XXL-JOB-ACCESS-TOKEN : {请求令牌}
请求数据格式如下,放置在 RequestBody 中,JSON格式:
{
"jobId":1 // 任务ID
}
响应数据格式:
{
"code": 200, // 200 表示正常、其他失败
"msg": null // 错误提示消息
}
c、触发任务
说明:触发任务执行
地址格式:{执行器内嵌服务跟地址}/run
Header:
XXL-JOB-ACCESS-TOKEN : {请求令牌}
请求数据格式如下,放置在 RequestBody 中,JSON格式:
{
"jobId":1, // 任务ID
"executorHandler":"demoJobHandler", // 任务标识
"executorParams":"demoJobHandler", // 任务参数
"executorBlockStrategy":"COVER_EARLY", // 任务阻塞策略,可选值参考 com.xxl.job.core.enums.ExecutorBlockStrategyEnum
"executorTimeout":0, // 任务超时时间,单位秒,大于零时生效
"logId":1, // 本次调度日志ID
"logDateTime":1586629003729, // 本次调度日志时间
"glueType":"BEAN", // 任务模式,可选值参考 com.xxl.job.core.glue.GlueTypeEnum
"glueSource":"xxx", // GLUE脚本代码
"glueUpdatetime":1586629003727, // GLUE脚本更新时间,用于判定脚本是否变更以及是否需要刷新
"broadcastIndex":0, // 分片参数:当前分片
"broadcastTotal":0 // 分片参数:总分片
}
响应数据格式:
{
"code": 200, // 200 表示正常、其他失败
"msg": null // 错误提示消息
}
f、终止任务
说明:终止任务
地址格式:{执行器内嵌服务跟地址}/kill
Header:
XXL-JOB-ACCESS-TOKEN : {请求令牌}
请求数据格式如下,放置在 RequestBody 中,JSON格式:
{
"jobId":1 // 任务ID
}
响应数据格式:
{
"code": 200, // 200 表示正常、其他失败
"msg": null // 错误提示消息
}
d、查看执行日志
说明:终止任务,滚动方式加载
地址格式:{执行器内嵌服务跟地址}/log
Header:
XXL-JOB-ACCESS-TOKEN : {请求令牌}
请求数据格式如下,放置在 RequestBody 中,JSON格式:
{
"logDateTim":0, // 本次调度日志时间
"logId":0, // 本次调度日志ID
"fromLineNum":0 // 日志开始行号,滚动加载日志
}
响应数据格式:
{
"code":200, // 200 表示正常、其他失败
"msg": null // 错误提示消息
"content":{
"fromLineNum":0, // 本次请求,日志开始行数
"toLineNum":100, // 本次请求,日志结束行号
"logContent":"xxx", // 本次请求日志内容
"isEnd":true // 日志是否全部加载完
}
}
最后的彩蛋:
XXL-JOB-PLUS: XXL-JOB的加强版
https://github.com/mousycoder/xxl-job-plus
公众号:
暂无答案!
目前还没有任何答案,快来回答吧!