*/ public class JobThread extends Thread{ private static Logger logger = LoggerFactory.getLogger(JobThread.class);
private int jobId; private IJobHandler handler; private LinkedBlockingQueue triggerQueue; private Set triggerLogIdSet; // avoid repeat trigger for the same TRIGGER_LOG_ID
10条答案
按热度按时间zbq4xfa01#
给你手写一个。
332nm8kg2#
jobId很好加,但是会对其它模块造成影响,就是想请问下你这边的啥场景需要用这个?
3zwjbxry3#
package com.xxl.job.core.handler.annotation;
import java.lang.annotation.*;
/**
*/
@target({ElementType.METHOD})
@retention(RetentionPolicy.RUNTIME)
@inherited
public @interface XxlJob {
/**
*/
String value();
/**
*/
String init() default "";
/**
*/
String destroy() default "";
}
3zwjbxry4#
package com.xxl.job.core.executor.impl;
import com.xxl.job.core.executor.XxlJobExecutor;
import com.xxl.job.core.glue.GlueFactory;
import com.xxl.job.core.handler.annotation.XxlJob;
import com.xxl.job.core.handler.impl.MethodJobHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.SmartInitializingSingleton;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.core.MethodIntrospector;
import org.springframework.core.annotation.AnnotatedElementUtils;
import java.lang.reflect.Method;
import java.util.Map;
/**
*/
public class XxlJobSpringExecutor extends XxlJobExecutor implements ApplicationContextAware, SmartInitializingSingleton, DisposableBean {
private static final Logger logger = LoggerFactory.getLogger(XxlJobSpringExecutor.class);
// start
@OverRide
public void afterSingletonsInstantiated() {
}
// destroy
@OverRide
public void destroy() {
super.destroy();
}
/*private void initJobHandlerRepository(ApplicationContext applicationContext) {
if (applicationContext == null) {
return;
}
}*/
private void initJobHandlerMethodRepository(ApplicationContext applicationContext) {
if (applicationContext == null) {
return;
}
// init job handler from method
String[] beanDefinitionNames = applicationContext.getBeanNamesForType(Object.class, false, true);
for (String beanDefinitionName : beanDefinitionNames) {//遍历每个容器对象
Object bean = applicationContext.getBean(beanDefinitionName);
}
// ---------------------- applicationContext ----------------------
private static ApplicationContext applicationContext;
@OverRide
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
this.applicationContext = applicationContext;
}
public static ApplicationContext getApplicationContext() {
return applicationContext;
}
}
v9tzhpje5#
package com.xxl.job.core.handler.impl;
import com.xxl.job.core.handler.IJobHandler;
import java.lang.reflect.Method;
/**
*/
public class MethodJobHandler extends IJobHandler {
private final Object target; //目标对象
private final Method method; //调度器方法
private Method initMethod; //初始方法
private Method destroyMethod; //销毁方法
public MethodJobHandler(Object target, Method method, Method initMethod, Method destroyMethod) {
this.target = target;
this.method = method;
}
@OverRide
public void execute() throws Exception {
Class<?>[] paramTypes = method.getParameterTypes();
if (paramTypes.length > 0) {
method.invoke(target, new Object[paramTypes.length]); // method-param can not be primitive-types
} else {
method.invoke(target);
}
}
@OverRide
public void init() throws Exception {
if(initMethod != null) {
initMethod.invoke(target);
}
}
@OverRide
public void destroy() throws Exception {
if(destroyMethod != null) {
destroyMethod.invoke(target);
}
}
@OverRide
public String toString() {
return super.toString()+"["+ target.getClass() + "#" + method.getName() +"]";
}
}
bjg7j2ky6#
package com.xxl.job.core.thread;
import com.xxl.job.core.biz.model.HandleCallbackParam;
import com.xxl.job.core.biz.model.ReturnT;
import com.xxl.job.core.biz.model.TriggerParam;
import com.xxl.job.core.context.XxlJobContext;
import com.xxl.job.core.context.XxlJobHelper;
import com.xxl.job.core.executor.XxlJobExecutor;
import com.xxl.job.core.handler.IJobHandler;
import com.xxl.job.core.log.XxlJobFileAppender;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.PrintWriter;
import java.io.StringWriter;
import java.util.Collections;
import java.util.Date;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.*;
/**
*/
public class JobThread extends Thread{
private static Logger logger = LoggerFactory.getLogger(JobThread.class);
private int jobId;
private IJobHandler handler;
private LinkedBlockingQueue triggerQueue;
private Set triggerLogIdSet; // avoid repeat trigger for the same TRIGGER_LOG_ID
private volatile boolean toStop = false;
private String stopReason;
private boolean running = false; // if running job
private int idleTimes = 0; // idel times
public JobThread(int jobId, IJobHandler handler) {
this.jobId = jobId;
this.handler = handler;
this.triggerQueue = new LinkedBlockingQueue();
this.triggerLogIdSet = Collections.synchronizedSet(new HashSet());
}
public IJobHandler getHandler() {
return handler;
}
/**
*/
public ReturnT pushTriggerQueue(TriggerParam triggerParam) {
// avoid repeat 若包含,则说明重复执行
if (triggerLogIdSet.contains(triggerParam.getLogId())) {
logger.info(">>>>>>>>>>> repeate trigger job, logId:{}", triggerParam.getLogId());
return new ReturnT(ReturnT.FAIL_CODE, "repeate trigger job, logId:" + triggerParam.getLogId());
}
triggerLogIdSet.add(triggerParam.getLogId());
triggerQueue.add(triggerParam);
return ReturnT.SUCCESS;
}
/**
public void toStop(String stopReason) {
/**
*/
this.toStop = true;
this.stopReason = stopReason;
}
/**
*/
public boolean isRunningOrHasQueue() {
return running || triggerQueue.size()>0;
}
@OverRide
public void run() {
}
}
zmeyuzjn7#
package com.xxl.job.core.handler;
/**
*/
public abstract class IJobHandler {
/**
*/
public abstract void execute() throws Exception;
/@deprecated
public abstract ReturnT execute(String param) throws Exception;/
/**
*/
public void init() throws Exception {
// do something
}
/**
*/
public void destroy() throws Exception {
// do something
}
}
sczxawaw8#
package com.xxl.job.core.handler.impl;
import com.xxl.job.core.context.XxlJobHelper;
import com.xxl.job.core.handler.IJobHandler;
/**
*/
public class GlueJobHandler extends IJobHandler {
private long glueUpdatetime;
private IJobHandler jobHandler;
public GlueJobHandler(IJobHandler jobHandler, long glueUpdatetime) {
this.jobHandler = jobHandler;
this.glueUpdatetime = glueUpdatetime;
}
public long getGlueUpdatetime() {
return glueUpdatetime;
}
@OverRide
public void execute() throws Exception {
XxlJobHelper.log("----------- glue.version:"+ glueUpdatetime +" -----------");
jobHandler.execute();
}
@OverRide
public void init() throws Exception {
this.jobHandler.init();
}
@OverRide
public void destroy() throws Exception {
this.jobHandler.destroy();
}
}
vq8itlhq9#
使用demo ,添加isNeedJobId =true,则传入jobId。默认不传jobId
@xxljob(value="demoJobHandler",init="init",isNeedJobId = true)
public void demoJobHandler() throws Exception {
XxlJobHelper.log("XXL-JOB, Hello World.");
iyr7buue10#
内部封装了上下文
XxlJobHelper.getJobId()
但是只能执行调度方法才能调用.
/**
*/
@xxljob("demoJobHandler")
public void demoJobHandler() throws Exception {
XxlJobHelper.getJobId();
}