springboot:通过http动态操作xxl-job任务

x33g5p2x  于2022-08-17 转载在 Spring  
字(19.7k)|赞(0)|评价(0)|浏览(1232)

springboot:通过http动态操作xxl-job任务

一、maven依赖

<dependency>
            <groupId>com.squareup.okhttp3</groupId>
            <artifactId>okhttp</artifactId>
            <version>3.8.1</version>
        </dependency>

        <dependency>
            <groupId>cn.hutool</groupId>
            <artifactId>hutool-all</artifactId>
            <version>5.6.4</version>
        </dependency>

        <dependency>
            <groupId>com.xuxueli</groupId>
            <artifactId>xxl-job-core</artifactId>
            <version>2.2.0</version>
        </dependency>

		<dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.18.20</version>
        </dependency>

二、配置文件

xxl:
  job:
    login:
      address: http://192.168.31.91:18080/xxl-job-admin
      username: admin
      password: 123456

三、xxl-job实体类

XxlJobActuatorManagerInfo

@Data
@NoArgsConstructor
@AllArgsConstructor
public class XxlJobActuatorManagerInfo {

    private Integer recordsFiltered;
    private Integer recordsTotal;
    private List<XxlJobGroup> data;
}

XxlJobGroup

@Data
@NoArgsConstructor
@AllArgsConstructor
public class XxlJobGroup {

    private int id;
    private String appname;
    private String title;
    private int addressType;        // 执行器地址类型:0=自动注册、1=手动录入
    private String addressList;     // 执行器地址列表,多地址逗号分隔(手动录入)
    private Date updateTime;

    // registry list
    private List<String> registryList;  // 执行器地址列表(系统注册)
    public List<String> getRegistryList() {
        if (addressList!=null && addressList.trim().length()>0) {
            registryList = new ArrayList<String>(Arrays.asList(addressList.split(",")));
        }
        return registryList;
    }
}

XxlJobInfo

@Data
@NoArgsConstructor
@AllArgsConstructor
public class XxlJobInfo {
	
	private int id;				// 主键ID
	
	private int jobGroup;		// 执行器主键ID
	private String jobDesc;
	private String jobCron;		//corn表达式
	
	private Date addTime;
	private Date updateTime;
	
	private String author;		// 负责人
	private String alarmEmail;	// 报警邮件

	private String scheduleType;			// 调度类型
	private String scheduleConf;			// 调度配置,值含义取决于调度类型
	private String misfireStrategy;			// 调度过期策略

	private String executorRouteStrategy;	// 执行器路由策略
	private String executorHandler;		    // 执行器,任务Handler名称
	private String executorParam;		    // 执行器,任务参数
	private String executorBlockStrategy;	// 阻塞处理策略
	private int executorTimeout;     		// 任务执行超时时间,单位秒
	private int executorFailRetryCount;		// 失败重试次数
	
	private String glueType;		// GLUE类型	#com.xxl.job.core.glue.GlueTypeEnum
	private String glueSource;		// GLUE源代码
	private String glueRemark;		// GLUE备注
	private Date glueUpdatetime;	// GLUE更新时间

	private String childJobId;		// 子任务ID,多个逗号分隔

	private int triggerStatus;		// 调度状态:0-停止,1-运行
	private long triggerLastTime;	// 上次调度时间
	private long triggerNextTime;	// 下次调度时间
}

XxlJobResponseInfo

@Data
@NoArgsConstructor
@AllArgsConstructor
public class XxlJobResponseInfo {

    private Integer code;
    private String msg;
    private String content;
}

XxlJobTaskManagerInfo

@Data
@NoArgsConstructor
@AllArgsConstructor
public class XxlJobTaskManagerInfo {

    private Integer recordsFiltered;
    private Integer recordsTotal;
    private List<XxlJobInfo> data;
}

四、工具类

HttpClientConfig

@Data
@NoArgsConstructor
@AllArgsConstructor
public class HttpClientConfig {

    private String url;
    private String username;
    private String password;
    private String oauthToken;
    private int connectionTimeout = 60000;
    private int requestTimeout = 60000;
    private int webSocketPingInterval;
    private int maxConcurrentRequestsPerHost = 30;
    private int maxConnection = 40;

    private String httpProxy;
    private String httpsProxy;
    private String proxyUsername;
    private String proxyPassword;
    private String userAgent;
    private TlsVersion[] tlsVersions = new TlsVersion[]{TLS_1_2};
    private String[] noProxy;

    public static final String HTTP_PROTOCOL_PREFIX = "http://";
    public static final String HTTPS_PROTOCOL_PREFIX = "https://";
    
}

HttpClientUtils

package com.mye.xxljobtest.util;

import okhttp3.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.http.HttpMethod;
import org.springframework.util.CollectionUtils;
import org.springframework.util.StringUtils;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.MalformedURLException;
import java.net.Proxy;
import java.net.URL;
import java.util.Arrays;
import java.util.Map;
import java.util.concurrent.TimeUnit;

import static okhttp3.ConnectionSpec.CLEARTEXT;

public class HttpClientUtils {

    private static final Logger logger = LoggerFactory.getLogger(HttpClientUtils.class);

    /**
     * json传输方式
     */
    private static final MediaType JSON = MediaType.parse("application/json; charset=utf-8");

    private static volatile OkHttpClient client;

    public static Headers doLoginRequest(HttpClientConfig config, Map<String, String> params) {
        try {
            OkHttpClient client = getInstance(config);
            FormBody.Builder builder = new FormBody.Builder();
            for (Map.Entry<String, String> param : params.entrySet()) {
                builder.add(param.getKey(), param.getValue());
            }
            FormBody formBody = builder.build();
            Request request = new Request.Builder().url(config.getUrl()).post(formBody).build();
            Response response = client.newCall(request).execute();
            if (response.isSuccessful() && response.body() != null) {
                System.out.println(JsonUtil.objectToJson(response));
                return response.headers();
            } else if (response.body() != null){
                return null;
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
        return null;
    }

    public static String doFormRequest(HttpClientConfig config, Map<String, String> params, String cookie) {
        try {
            OkHttpClient client = getInstance(config);
            FormBody.Builder builder = new FormBody.Builder();
            for (Map.Entry<String, String> param : params.entrySet()) {
                builder.add(param.getKey(), param.getValue());
            }
            FormBody formBody = builder.build();
            Request request = new Request.Builder().url(config.getUrl()).header("Cookie", cookie).post(formBody).build();
            Response response = client.newCall(request).execute();
            if (response.isSuccessful() && response.body() != null) {
                System.out.println(JsonUtil.objectToJson(response));
                return response.body().string();
            } else if (response.body() != null){
                return response.body().string();
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
        return null;
    }

    public static String doRequest(HttpClientConfig config, HttpMethod method, Map<String, String> headers, String body) {
        try {
            OkHttpClient client = getInstance(config);
            //创建请求
            RequestBody requestBody = RequestBody.create(JSON, StringUtils.isEmpty(body) ? "" : body);

            Request.Builder builder = new Request.Builder();
            if (!CollectionUtils.isEmpty(headers)) {
                logger.info("headers : " + headers);
                builder.headers(Headers.of(headers));
            }

            Request request = builder.method(method.name(), requestBody).url(config.getUrl()).build();

            Response response = client.newCall(request).execute();
            if (response.isSuccessful() && response.body() != null) {
                return response.body().string();
            } else if (response.body() != null){
                return response.body().string();
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
        return null;
    }

    /**
     * 双重检查单例
     * @param config OkHttpClient配置
     * @return okHttpClient
     */
    public static OkHttpClient getInstance(HttpClientConfig config) {
        if (client == null) {
            synchronized (OkHttpClient.class) {
                if (client == null) {
                    client = createHttpClient(config);
                }
            }
        }
        //拿到client之后把认证信息重新加一遍
        client.newBuilder().addInterceptor(chain -> {
            Request request = chain.request();
            if (StringUtils.hasText(config.getUsername()) && StringUtils.hasText(config.getPassword())) {
                Request authReq = chain.request().newBuilder().addHeader("Authorization", Credentials.basic(config.getUsername(), config.getPassword())).build();
                return chain.proceed(authReq);
            } else if (StringUtils.hasText( config.getOauthToken())) {
                Request authReq = chain.request().newBuilder().addHeader("Authorization", "Bearer " + config.getOauthToken()).build();
                return chain.proceed(authReq);
            }
            return chain.proceed(request);
        });
        return client;
    }

    private static OkHttpClient createHttpClient(final HttpClientConfig config) {
        try {
            OkHttpClient.Builder httpClientBuilder = new OkHttpClient.Builder();
            httpClientBuilder.followRedirects(true);
            httpClientBuilder.followSslRedirects(true);
            if (config.getConnectionTimeout() > 0) {
                httpClientBuilder.connectTimeout(config.getConnectionTimeout(), TimeUnit.MILLISECONDS);
            }
            if (config.getRequestTimeout() > 0) {
                httpClientBuilder.readTimeout(config.getRequestTimeout(), TimeUnit.MILLISECONDS);
            }
            if (config.getWebSocketPingInterval() > 0) {
                httpClientBuilder.pingInterval(config.getWebSocketPingInterval(), TimeUnit.MILLISECONDS);
            }
            if (config.getMaxConcurrentRequestsPerHost() > 0) {
                Dispatcher dispatcher = new Dispatcher();
                dispatcher.setMaxRequestsPerHost(config.getMaxConcurrentRequestsPerHost());
                httpClientBuilder.dispatcher(dispatcher);
            }
            if (config.getMaxConnection() > 0) {
                ConnectionPool connectionPool = new ConnectionPool(config.getMaxConnection(), 60, TimeUnit.SECONDS);
                httpClientBuilder.connectionPool(connectionPool);
            }
            // Only check proxy if it's a full URL with protocol
            if (config.getUrl().toLowerCase().startsWith(HttpClientConfig.HTTP_PROTOCOL_PREFIX) || config.getUrl().startsWith(HttpClientConfig.HTTPS_PROTOCOL_PREFIX)) {
                try {
                    URL proxyUrl = getProxyUrl(config);
                    if (proxyUrl != null) {
                        httpClientBuilder.proxy(new Proxy(Proxy.Type.HTTP, new InetSocketAddress(proxyUrl.getHost(), proxyUrl.getPort())));
                        if (config.getProxyUsername() != null) {
                            httpClientBuilder.proxyAuthenticator((route, response) -> {
                                String credential = Credentials.basic(config.getProxyUsername(), config.getProxyPassword());
                                return response.request().newBuilder().header("Proxy-Authorization", credential).build();
                            });
                        }
                    }
                } catch (MalformedURLException e) {
                    throw new IllegalArgumentException("Invalid proxy server configuration", e);
                }
            }
            if (config.getUserAgent() != null && !config.getUserAgent().isEmpty()) {
                httpClientBuilder.addNetworkInterceptor(chain -> {
                    Request agent = chain.request().newBuilder().header("User-Agent", config.getUserAgent()).build();
                    return chain.proceed(agent);
                });
            }
            if (config.getTlsVersions() != null && config.getTlsVersions().length > 0) {
                ConnectionSpec spec = new ConnectionSpec.Builder(ConnectionSpec.MODERN_TLS)
                        .tlsVersions(config.getTlsVersions())
                        .build();
                httpClientBuilder.connectionSpecs(Arrays.asList(spec, CLEARTEXT));
            }
            return httpClientBuilder.build();
        } catch (Exception e) {
            throw new IllegalArgumentException("创建OKHTTPClient错误", e);
        }
    }

    private static URL getProxyUrl(HttpClientConfig config) throws MalformedURLException {
        URL master = new URL(config.getUrl());
        String host = master.getHost();
        if (config.getNoProxy() != null) {
            for (String noProxy : config.getNoProxy()) {
                if (host.endsWith(noProxy)) {
                    return null;
                }
            }
        }
        String proxy = config.getHttpsProxy();
        String http = "http";
        if (http.equals(master.getProtocol())) {
            proxy = config.getHttpProxy();
        }
        if (proxy != null) {
            return new URL(proxy);
        }
        return null;
    }

}

XxlJobApiUtils

package com.mye.xxljobtest.util;

import cn.hutool.core.collection.CollectionUtil;
import cn.hutool.core.convert.Convert;
import cn.hutool.core.util.ObjectUtil;
import cn.hutool.json.JSONUtil;
import com.mye.xxljobtest.jobcore.*;
import okhttp3.Headers;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.http.HttpStatus;
import org.springframework.stereotype.Component;

import java.util.HashMap;
import java.util.List;
import java.util.Map;

/**
 * xxl-job api 操作工具类
 * @author hl
 * @date 2022/8/1 14:05
 */
@Component
public class XxlJobApiUtils {

    private static final Logger logger = LoggerFactory.getLogger(XxlJobApiUtils.class);

    @Value("${xxl.job.login.address}")
    private String xxlJobLoginAddress;

    @Value("${xxl.job.login.username}")
    private String xxlJobLoginUserName;

    @Value("${xxl.job.login.password}")
    private String xxlJobLoginPassword;

    /**
     * 启动xxl-job任务管理
     * @param taskId 任务管理id
     */
    public void startTask(Integer taskId){
        //获取登录cookie
        HttpClientConfig clientConfig = new HttpClientConfig();
        String cookie = loginTaskCenter(clientConfig);

        //创建参数
        Map<String, String> form = new HashMap<>();
        form.put("id", "" + taskId);

        clientConfig.setUrl(xxlJobLoginAddress + "/jobinfo/start");
        String result = HttpClientUtils.doFormRequest(clientConfig, form, cookie);
        XxlJobResponseInfo info = JSONUtil.toBean(JSONUtil.parseObj(result), XxlJobResponseInfo.class);

        if (ObjectUtil.isNull(info) || info.getCode() != HttpStatus.OK.value()){
            logger.error(info.getMsg(),new RuntimeException());
        }
    }

    /**
     * 删除 xxl-job任务管理
     * @param id 任务id
     */
    public void deleteTask(Integer id){
        //获取登录cookie
        HttpClientConfig clientConfig = new HttpClientConfig();
        String cookie = loginTaskCenter(clientConfig);

        //创建任务管理参数
        Map<String, String> form = new HashMap<>();
        form.put("id", id + "");

        clientConfig.setUrl(xxlJobLoginAddress + "/jobinfo/remove");
        String result = HttpClientUtils.doFormRequest(clientConfig, form, cookie);

        XxlJobResponseInfo info = JSONUtil.toBean(JSONUtil.parseObj(result), XxlJobResponseInfo.class);

        if (ObjectUtil.isNull(info) || info.getCode() != HttpStatus.OK.value()){
            logger.error(info.getMsg(),new RuntimeException());
        }
    }

    /**
     * 编辑 xxl-job任务管理
     * @param xxlJobInfo 查询参数
     */
    public void editTask(XxlJobInfo xxlJobInfo){
        //获取登录cookie
        HttpClientConfig clientConfig = new HttpClientConfig();
        String cookie = loginTaskCenter(clientConfig);

        //创建任务管理参数
        Map<String, String> form = new HashMap<>();
        form.put("id",xxlJobInfo.getId() + "");
        form.put("jobGroup", xxlJobInfo.getJobGroup() + "");
        form.put("jobDesc", xxlJobInfo.getJobDesc());
        form.put("executorRouteStrategy", "ROUND");
        form.put("jobCron", xxlJobInfo.getJobCorn());
        form.put("glueType", "BEAN");
        form.put("executorHandler", xxlJobInfo.getExecutorHandler());
        form.put("executorBlockStrategy", "SERIAL_EXECUTION");
        form.put("author", "mye");

        clientConfig.setUrl(xxlJobLoginAddress + "/jobinfo/update");
        String result = HttpClientUtils.doFormRequest(clientConfig, form, cookie);
        XxlJobResponseInfo info = JSONUtil.toBean(JSONUtil.parseObj(result), XxlJobResponseInfo.class);

        if (ObjectUtil.isNull(info) || info.getCode() != HttpStatus.OK.value()){
            logger.error(info.getMsg(),new RuntimeException());
        }
    }

    /**
     * 查询所有的task
     * @param xxlJobInfo
     * @return
     */
    public XxlJobTaskManagerInfo selectAllTask(XxlJobInfo xxlJobInfo) {
        //获取登录cookie
        HttpClientConfig clientConfig = new HttpClientConfig();
        String cookie = loginTaskCenter(clientConfig);

        //创建任务管理参数
        Map<String, String> form = new HashMap<>();
        form.put("jobGroup", xxlJobInfo.getJobGroup() + "");
        form.put("triggerStatus", "-1");

        clientConfig.setUrl(xxlJobLoginAddress + "/jobinfo/pageList");
        String result = HttpClientUtils.doFormRequest(clientConfig, form, cookie);
        return JSONUtil.toBean(JSONUtil.parseObj(result), XxlJobTaskManagerInfo.class);
    }

    /**
     * 查询 xxl-job任务管理
     * @param xxlJobInfo 查询参数
     */
    public XxlJobTaskManagerInfo selectTask(XxlJobInfo xxlJobInfo){
        //获取登录cookie
        HttpClientConfig clientConfig = new HttpClientConfig();
        String cookie = loginTaskCenter(clientConfig);

        //创建任务管理参数
        Map<String, String> form = new HashMap<>();
        form.put("jobGroup", xxlJobInfo.getJobGroup() + "");
        form.put("jobDesc", xxlJobInfo.getJobDesc());
        form.put("executorHandler", xxlJobInfo.getExecutorHandler());
        form.put("author", xxlJobInfo.getAuthor());
        form.put("triggerStatus", "-1");

        clientConfig.setUrl(xxlJobLoginAddress + "/jobinfo/pageList");
        String result = HttpClientUtils.doFormRequest(clientConfig, form, cookie);
        XxlJobTaskManagerInfo info = JSONUtil.toBean(JSONUtil.parseObj(result), XxlJobTaskManagerInfo.class);

        if (ObjectUtil.isNull(info) || CollectionUtil.isEmpty(info.getData())){
            logger.error("xxl-job任务管理不存在",new RuntimeException());
        }

        return info;
    }

    /**
     * 创建任务管理
     * @param xxlJobInfo 创建参数
     */
    public XxlJobResponseInfo createTask(XxlJobInfo xxlJobInfo){
        //获取登录cookie
        HttpClientConfig clientConfig = new HttpClientConfig();
        String cookie = loginTaskCenter(clientConfig);

        //创建任务管理参数
        Map<String, String> form = new HashMap<>();

        form.put("jobGroup", xxlJobInfo.getJobGroup() + "");
        form.put("jobDesc", xxlJobInfo.getJobDesc());
        form.put("executorRouteStrategy", "ROUND");
        form.put("jobCron", xxlJobInfo.getJobCorn());
        form.put("glueType", "BEAN");
        form.put("executorHandler", xxlJobInfo.getExecutorHandler());
        form.put("executorBlockStrategy", "SERIAL_EXECUTION");
        form.put("author", "mye");

        //创建任务管理
        clientConfig.setUrl(xxlJobLoginAddress + "/jobinfo/add");
        String result = HttpClientUtils.doFormRequest(clientConfig, form, cookie);
        XxlJobResponseInfo info = JSONUtil.toBean(JSONUtil.parseObj(result), XxlJobResponseInfo.class);

        if (ObjectUtil.isNull(info) || info.getCode() != HttpStatus.OK.value()){
            logger.error(info.getMsg(),new RuntimeException());
        }

        return info;
    }

    /**
     * 删除执行器
     */
    public void deleteActuator(XxlJobGroup xxlJobGroup) {
        //获取登录cookie
        HttpClientConfig clientConfig = new HttpClientConfig();
        String cookie = loginTaskCenter(clientConfig);

        //创建查询执行器管理器参数
        Map<String, String> form = new HashMap<>();
        form.put("id", xxlJobGroup.getId() + "");

        //创建执行器管理器地址
        clientConfig.setUrl(xxlJobLoginAddress + "/jobgroup/remove");
        String result = HttpClientUtils.doFormRequest(clientConfig, form, cookie);

        XxlJobResponseInfo info = JSONUtil.toBean(JSONUtil.parseObj(result), XxlJobResponseInfo.class);
        if (ObjectUtil.isNull(info) || info.getCode() != HttpStatus.OK.value()) {
            logger.error(info.getMsg(), new RuntimeException());
        }
    }

    /**
     * 编辑执行器
     */
    public void editActuator(XxlJobGroup xxlJobGroup){
        //获取登录cookie
        HttpClientConfig clientConfig = new HttpClientConfig();
        String cookie = loginTaskCenter(clientConfig);

        //创建查询执行器管理器参数
        Map<String, String> form = new HashMap<>();
        form.put("appname", xxlJobGroup.getAppname());
        form.put("title", xxlJobGroup.getTitle());
        form.put("addressType", xxlJobGroup.getAddressType() + "");
        form.put("id", xxlJobGroup.getId() + "");

        //创建执行器管理器地址
        clientConfig.setUrl(xxlJobLoginAddress + "/jobgroup/update");
        String result = HttpClientUtils.doFormRequest(clientConfig, form, cookie);

        XxlJobResponseInfo info = JSONUtil.toBean(JSONUtil.parseObj(result), XxlJobResponseInfo.class);
        if (ObjectUtil.isNull(info) || info.getCode() != HttpStatus.OK.value()){
            logger.error(info.getMsg(),new RuntimeException());
        }
    }

    /**
     * 查询执行器 (appname 和 title 都是模糊查询)
     * @param xxlJobGroup XxlJobGroup
     * @return xxlJobGroup 集合
     */
    public List<XxlJobGroup> selectActuator(XxlJobGroup xxlJobGroup){
        //获取登录cookie
        HttpClientConfig clientConfig = new HttpClientConfig();
        String cookie = loginTaskCenter(clientConfig);

        //创建查询执行器管理器参数
        Map<String, String> form = new HashMap<>();
        form.put("appname", xxlJobGroup.getAppname());
        form.put("title", xxlJobGroup.getTitle());

        //创建执行器管理器地址
        clientConfig.setUrl(xxlJobLoginAddress + "/jobgroup/pageList");
        String result = HttpClientUtils.doFormRequest(clientConfig, form, cookie);

        XxlJobActuatorManagerInfo info = JSONUtil.toBean(JSONUtil.parseObj(result), XxlJobActuatorManagerInfo.class);
        if (CollectionUtil.isEmpty(info.getData())){
            throw new RuntimeException("该执行器管理器不存在:" + xxlJobGroup.getAppname());
        }

        return info.getData();
    }

    /**
     * 创建执行器
     *
     * @param xxlJobGroup 创建参数
     */
    public XxlJobResponseInfo createActuator(XxlJobGroup xxlJobGroup) {
        //获取登录cookie
        HttpClientConfig clientConfig = new HttpClientConfig();
        String cookie = loginTaskCenter(clientConfig);

        //创建执行器管理器参数
        Map<String, String> form = new HashMap<>();
        form.put("appname", xxlJobGroup.getAppname());
        form.put("title", xxlJobGroup.getTitle());
        form.put("addressType", xxlJobGroup.getAddressType() + "");

        //创建执行器管理器地址
        clientConfig.setUrl(xxlJobLoginAddress + "/jobgroup/save");
        String result = HttpClientUtils.doFormRequest(clientConfig, form, cookie);
        XxlJobResponseInfo info = JSONUtil.toBean(JSONUtil.parseObj(result), XxlJobResponseInfo.class);

        if (ObjectUtil.isNull(info) || info.getCode() != HttpStatus.OK.value()){
            logger.error(info.getMsg(),new RuntimeException());
        }
        return info;
    }

    /**
     * 登录任务调度平台
     *
     * @param clientConfig clientConfig
     * @return cookie
     */
    public String loginTaskCenter(HttpClientConfig clientConfig) {
        Map<String, String> loginForm = new HashMap<>();
        clientConfig.setUrl(xxlJobLoginAddress + "/login");
        clientConfig.setUsername(xxlJobLoginUserName);
        clientConfig.setPassword(xxlJobLoginPassword);
        loginForm.put("userName", xxlJobLoginUserName);
        loginForm.put("password", xxlJobLoginPassword);
        Headers headers = HttpClientUtils.doLoginRequest(clientConfig, loginForm);
        assert headers != null;
        return headers.get("Set-Cookie");
    }
}

相关文章