xxl-job init 方法中无法获取jobId及相关参数获取?

ttvkxqim  于 2022-04-21  发布在  Java
关注(0)|答案(10)|浏览(419)

init 方法中无法获取jobId及相关参数获取,希望后续版本可以支持

zbq4xfa0

zbq4xfa01#

给你手写一个。

332nm8kg

332nm8kg2#

jobId很好加,但是会对其它模块造成影响,就是想请问下你这边的啥场景需要用这个?

3zwjbxry

3zwjbxry3#

package com.xxl.job.core.handler.annotation;

import java.lang.annotation.*;

/**

  • annotation for method jobhandler
  • @author xuxueli 2019-12-11 20:50:13

*/
@target({ElementType.METHOD})
@retention(RetentionPolicy.RUNTIME)
@inherited
public @interface XxlJob {

/**

  • jobhandler name

*/
String value();

/**

  • init handler, invoked when JobThread init

*/
String init() default "";

/**

  • destroy handler, invoked when JobThread destroy

*/
String destroy() default "";

}

3zwjbxry

3zwjbxry4#

package com.xxl.job.core.executor.impl;

import com.xxl.job.core.executor.XxlJobExecutor;
import com.xxl.job.core.glue.GlueFactory;
import com.xxl.job.core.handler.annotation.XxlJob;
import com.xxl.job.core.handler.impl.MethodJobHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.SmartInitializingSingleton;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.core.MethodIntrospector;
import org.springframework.core.annotation.AnnotatedElementUtils;

import java.lang.reflect.Method;
import java.util.Map;

/**

  • xxl-job executor (for spring)
  • @author xuxueli 2018-11-01 09:24:52

*/
public class XxlJobSpringExecutor extends XxlJobExecutor implements ApplicationContextAware, SmartInitializingSingleton, DisposableBean {
private static final Logger logger = LoggerFactory.getLogger(XxlJobSpringExecutor.class);

// start
@OverRide
public void afterSingletonsInstantiated() {

// init JobHandler Repository
 /*initJobHandlerRepository(applicationContext);*/

 // init JobHandler Repository (for method)   初始化调度器资源管理器
 initJobHandlerMethodRepository(applicationContext);

 // refresh GlueFactory 刷新GlueFactory
 GlueFactory.refreshInstance(1);

 // super start
 try {
     //启动
     super.start();
 } catch (Exception e) {
     throw new RuntimeException(e);
 }

}

// destroy
@OverRide
public void destroy() {
super.destroy();
}

/*private void initJobHandlerRepository(ApplicationContext applicationContext) {
if (applicationContext == null) {
return;
}

// init job handler action
 Map<String, Object> serviceBeanMap = applicationContext.getBeansWithAnnotation(JobHandler.class);

 if (serviceBeanMap != null && serviceBeanMap.size() > 0) {
     for (Object serviceBean : serviceBeanMap.values()) {
         if (serviceBean instanceof IJobHandler) {
             String name = serviceBean.getClass().getAnnotation(JobHandler.class).value();
             IJobHandler handler = (IJobHandler) serviceBean;
             if (loadJobHandler(name) != null) {
                 throw new RuntimeException("xxl-job jobhandler[" + name + "] naming conflicts.");
             }
             registJobHandler(name, handler);
         }
     }
 }

}*/

private void initJobHandlerMethodRepository(ApplicationContext applicationContext) {
if (applicationContext == null) {
return;
}
// init job handler from method
String[] beanDefinitionNames = applicationContext.getBeanNamesForType(Object.class, false, true);
for (String beanDefinitionName : beanDefinitionNames) {//遍历每个容器对象
Object bean = applicationContext.getBean(beanDefinitionName);

Map<Method, XxlJob> annotatedMethods = null;   // referred to :org.springframework.context.event.EventListenerMethodProcessor.processBean
     try {
         //获取每个注解XxlJob方法
         annotatedMethods = MethodIntrospector.selectMethods(bean.getClass(),
                 new MethodIntrospector.MetadataLookup<XxlJob>() {
                     @Override
                     public XxlJob inspect(Method method) {
                         return AnnotatedElementUtils.findMergedAnnotation(method, XxlJob.class);
                     }
                 });
     } catch (Throwable ex) {
         logger.error("xxl-job method-jobhandler resolve error for bean[" + beanDefinitionName + "].", ex);
     }
     if (annotatedMethods==null || annotatedMethods.isEmpty()) {
         continue;
     }
     //遍历标记了XxlJob注解的方法
     for (Map.Entry<Method, XxlJob> methodXxlJobEntry : annotatedMethods.entrySet()) {
         Method executeMethod = methodXxlJobEntry.getKey();
         XxlJob xxlJob = methodXxlJobEntry.getValue();
         if (xxlJob == null) {
             continue;
         }

         //获取配置xxjob的触发器名称
         String name = xxlJob.value();
         if (name.trim().length() == 0) {
             throw new RuntimeException("xxl-job method-jobhandler name invalid, for[" + bean.getClass() + "#" + executeMethod.getName() + "] .");
         }
         //工作处理资源库是否有相同命名
         if (loadJobHandler(name) != null) {
             throw new RuntimeException("xxl-job jobhandler[" + name + "] naming conflicts.");
         }

         // execute method
         /*if (!(method.getParameterTypes().length == 1 && method.getParameterTypes()[0].isAssignableFrom(String.class))) {
             throw new RuntimeException("xxl-job method-jobhandler param-classtype invalid, for[" + bean.getClass() + "#" + method.getName() + "] , " +
                     "The correct method format like \" public ReturnT<String> execute(String param) \" .");
         }
         if (!method.getReturnType().isAssignableFrom(ReturnT.class)) {
             throw new RuntimeException("xxl-job method-jobhandler return-classtype invalid, for[" + bean.getClass() + "#" + method.getName() + "] , " +
                     "The correct method format like \" public ReturnT<String> execute(String param) \" .");
         }*/

         //设置可访问,设置后可通过反射调用私有方法
         executeMethod.setAccessible(true);

         // init and destory
         Method initMethod = null;
         Method destroyMethod = null;

         if (xxlJob.init().trim().length() > 0) {
             try {  //获取XxlJob标记的方法,配置的init方法
                 initMethod = bean.getClass().getDeclaredMethod(xxlJob.init());
                 initMethod.setAccessible(true);
             } catch (NoSuchMethodException e) {
                 throw new RuntimeException("xxl-job method-jobhandler initMethod invalid, for[" + bean.getClass() + "#" + executeMethod.getName() + "] .");
             }
         }
         if (xxlJob.destroy().trim().length() > 0) {
             try {//获取XxlJob标记的方法,配置的destroy方法
                 destroyMethod = bean.getClass().getDeclaredMethod(xxlJob.destroy());
                 destroyMethod.setAccessible(true);
             } catch (NoSuchMethodException e) {
                 throw new RuntimeException("xxl-job method-jobhandler destroyMethod invalid, for[" + bean.getClass() + "#" + executeMethod.getName() + "] .");
             }
         }

         // registry jobhandler 将xxljob配置的jobname作为key,对象,反射的执行,初始,销毁方法作为value注册jobHandlerRepository中
         registJobHandler(name, new MethodJobHandler(bean, executeMethod, initMethod, destroyMethod));
     }
 }

}

// ---------------------- applicationContext ----------------------
private static ApplicationContext applicationContext;

@OverRide
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
this.applicationContext = applicationContext;
}

public static ApplicationContext getApplicationContext() {
return applicationContext;
}

}

v9tzhpje

v9tzhpje5#

package com.xxl.job.core.handler.impl;

import com.xxl.job.core.handler.IJobHandler;

import java.lang.reflect.Method;

/**

  • @author xuxueli 2019-12-11 21:12:18

*/
public class MethodJobHandler extends IJobHandler {

private final Object target; //目标对象
private final Method method; //调度器方法
private Method initMethod; //初始方法
private Method destroyMethod; //销毁方法

public MethodJobHandler(Object target, Method method, Method initMethod, Method destroyMethod) {
this.target = target;
this.method = method;

this.initMethod = initMethod;
 this.destroyMethod = destroyMethod;

}

@OverRide
public void execute() throws Exception {
Class<?>[] paramTypes = method.getParameterTypes();
if (paramTypes.length > 0) {
method.invoke(target, new Object[paramTypes.length]); // method-param can not be primitive-types
} else {
method.invoke(target);
}
}

@OverRide
public void init() throws Exception {
if(initMethod != null) {
initMethod.invoke(target);
}
}

@OverRide
public void destroy() throws Exception {
if(destroyMethod != null) {
destroyMethod.invoke(target);
}
}

@OverRide
public String toString() {
return super.toString()+"["+ target.getClass() + "#" + method.getName() +"]";
}
}

bjg7j2ky

bjg7j2ky6#

package com.xxl.job.core.thread;

import com.xxl.job.core.biz.model.HandleCallbackParam;
import com.xxl.job.core.biz.model.ReturnT;
import com.xxl.job.core.biz.model.TriggerParam;
import com.xxl.job.core.context.XxlJobContext;
import com.xxl.job.core.context.XxlJobHelper;
import com.xxl.job.core.executor.XxlJobExecutor;
import com.xxl.job.core.handler.IJobHandler;
import com.xxl.job.core.log.XxlJobFileAppender;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.PrintWriter;
import java.io.StringWriter;
import java.util.Collections;
import java.util.Date;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.*;

/**

  • handler thread
  • @author xuxueli 2016-1-16 19:52:47

*/
public class JobThread extends Thread{
private static Logger logger = LoggerFactory.getLogger(JobThread.class);

private int jobId;
private IJobHandler handler;
private LinkedBlockingQueue triggerQueue;
private Set triggerLogIdSet; // avoid repeat trigger for the same TRIGGER_LOG_ID

private volatile boolean toStop = false;
private String stopReason;

private boolean running = false; // if running job
private int idleTimes = 0; // idel times

public JobThread(int jobId, IJobHandler handler) {
this.jobId = jobId;
this.handler = handler;
this.triggerQueue = new LinkedBlockingQueue();
this.triggerLogIdSet = Collections.synchronizedSet(new HashSet());
}
public IJobHandler getHandler() {
return handler;
}

/**

  • new trigger to queue
  • @param triggerParam
  • @return

*/
public ReturnT pushTriggerQueue(TriggerParam triggerParam) {
// avoid repeat 若包含,则说明重复执行
if (triggerLogIdSet.contains(triggerParam.getLogId())) {
logger.info(">>>>>>>>>>> repeate trigger job, logId:{}", triggerParam.getLogId());
return new ReturnT(ReturnT.FAIL_CODE, "repeate trigger job, logId:" + triggerParam.getLogId());
}

triggerLogIdSet.add(triggerParam.getLogId());
triggerQueue.add(triggerParam);
return ReturnT.SUCCESS;
}

/**

  • kill job thread
  • @param stopReason
  • /

public void toStop(String stopReason) {
/**

  • Thread.interrupt只支持终止线程的阻塞状态(wait、join、sleep),
  • 在阻塞出抛出InterruptedException异常,但是并不会终止运行的线程本身;
  • 所以需要注意,此处彻底销毁本线程,需要通过共享变量方式;

*/
this.toStop = true;
this.stopReason = stopReason;
}

/**

  • is running job
  • @return

*/
public boolean isRunningOrHasQueue() {
return running || triggerQueue.size()>0;
}

@OverRide
public void run() {

// init
 try {
 	//执行初始化任务
 	handler.init();
 } catch (Throwable e) {
 	logger.error(e.getMessage(), e);
 }

 // execute
 while(!toStop){
 	//标记任务没在运行
 	running = false;
 	//统计空闲执行次数
 	idleTimes++;

     TriggerParam triggerParam = null;
     try {
 		// to check toStop signal, we need cycle, so wo cannot use queue.take(), instand of poll(timeout)
 		//获取触发器任务
 		triggerParam = triggerQueue.poll(3L, TimeUnit.SECONDS);
 		if (triggerParam!=null) {
 			//标记任务处于运行态
 			running = true;
 			//空闲次数重置
 			idleTimes = 0;
 			//删除logId主要用来判断是否重复执行
 			triggerLogIdSet.remove(triggerParam.getLogId());

 			// log filename, like "logPath/yyyy-MM-dd/9999.log"   创建log文件
 			String logFileName = XxlJobFileAppender.makeLogFileName(new Date(triggerParam.getLogDateTime()), triggerParam.getLogId());
 			XxlJobContext xxlJobContext = new XxlJobContext(
 					triggerParam.getJobId(),
 					triggerParam.getExecutorParams(),
 					logFileName,
 					triggerParam.getBroadcastIndex(),
 					triggerParam.getBroadcastTotal());

 			// init job context
 			XxlJobContext.setXxlJobContext(xxlJobContext);

 			// execute
 			XxlJobHelper.log("<br>----------- xxl-job job execute start -----------<br>----------- Param:" + xxlJobContext.getJobParam());
 			//设置了超时就异步线程处理
 			if (triggerParam.getExecutorTimeout() > 0) {
 				// limit timeout
 				Thread futureThread = null;
 				try {
 					FutureTask<Boolean> futureTask = new FutureTask<Boolean>(new Callable<Boolean>() {
 						@Override
 						public Boolean call() throws Exception {

 							// init job context
 							XxlJobContext.setXxlJobContext(xxlJobContext);

 							handler.execute();
 							return true;
 						}
 					});
 					futureThread = new Thread(futureTask);
 					futureThread.start();
 					//异步线程处理并获取返回值
 					Boolean tempResult = futureTask.get(triggerParam.getExecutorTimeout(), TimeUnit.SECONDS);
 				} catch (TimeoutException e) {

 					XxlJobHelper.log("<br>----------- xxl-job job execute timeout");
 					XxlJobHelper.log(e);

 					// handle result
 					XxlJobHelper.handleTimeout("job execute timeout ");
 				} finally {
 					futureThread.interrupt();
 				}
 			} else {
 				// just execute  没设置超时时间,则立刻执行触发器
 				handler.execute();
 			}

 			// valid execute handle data
 			if (XxlJobContext.getXxlJobContext().getHandleCode() <= 0) {
 				XxlJobHelper.handleFail("job handle result lost.");
 			} else {
 				String tempHandleMsg = XxlJobContext.getXxlJobContext().getHandleMsg();
 				tempHandleMsg = (tempHandleMsg!=null&&tempHandleMsg.length()>50000)
 						?tempHandleMsg.substring(0, 50000).concat("...")
 						:tempHandleMsg;
 				XxlJobContext.getXxlJobContext().setHandleMsg(tempHandleMsg);
 			}
 			XxlJobHelper.log("<br>----------- xxl-job job execute end(finish) -----------<br>----------- Result: handleCode="
 					+ XxlJobContext.getXxlJobContext().getHandleCode()
 					+ ", handleMsg = "
 					+ XxlJobContext.getXxlJobContext().getHandleMsg()
 			);

 		} else {
 			//空闲执行次数超过30次,且队列没任务,则删除并终止线程
 			if (idleTimes > 30) {
 				if(triggerQueue.size() == 0) {	// avoid concurrent trigger causes jobId-lost
 					XxlJobExecutor.removeJobThread(jobId, "excutor idel times over limit.");
 				}
 			}
 		}
 	} catch (Throwable e) {
 		if (toStop) {
 			XxlJobHelper.log("<br>----------- JobThread toStop, stopReason:" + stopReason);
 		}

 		// handle result
 		StringWriter stringWriter = new StringWriter();
 		e.printStackTrace(new PrintWriter(stringWriter));
 		String errorMsg = stringWriter.toString();

 		XxlJobHelper.handleFail(errorMsg);

 		XxlJobHelper.log("<br>----------- JobThread Exception:" + errorMsg + "<br>----------- xxl-job job execute end(error) -----------");
 	} finally {
         if(triggerParam != null) {
             // callback handler info
             if (!toStop) {
                 // commonm  加入回调队列
                 TriggerCallbackThread.pushCallBack(new HandleCallbackParam(
                 		triggerParam.getLogId(),
 						triggerParam.getLogDateTime(),
 						XxlJobContext.getXxlJobContext().getHandleCode(),
 						XxlJobContext.getXxlJobContext().getHandleMsg() )
 				);
             } else {
                 // is killed
                 TriggerCallbackThread.pushCallBack(new HandleCallbackParam(
                 		triggerParam.getLogId(),
 						triggerParam.getLogDateTime(),
 						XxlJobContext.HANDLE_COCE_FAIL,
 						stopReason + " [job running, killed]" )
 				);
             }
         }
     }
 }

 // callback trigger request in queue
 while(triggerQueue !=null && triggerQueue.size()>0){
 	TriggerParam triggerParam = triggerQueue.poll();
 	if (triggerParam!=null) {
 		// is killed 加入回调队列
 		TriggerCallbackThread.pushCallBack(new HandleCallbackParam(
 				triggerParam.getLogId(),
 				triggerParam.getLogDateTime(),
 				XxlJobContext.HANDLE_COCE_FAIL,
 				stopReason + " [job not executed, in the job queue, killed.]")
 		);
 	}
 }

 // destroy
 try {
 	//销毁,执行调度器设置的销毁方法
 	handler.destroy();
 } catch (Throwable e) {
 	logger.error(e.getMessage(), e);
 }

 logger.info(">>>>>>>>>>> xxl-job JobThread stoped, hashCode:{}", Thread.currentThread());

}
}

zmeyuzjn

zmeyuzjn7#

package com.xxl.job.core.handler;

/**

  • job handler
  • @author xuxueli 2015-12-19 19:06:38

*/
public abstract class IJobHandler {

/**

  • execute handler, invoked when executor receives a scheduling request
  • @throws Exception

*/
public abstract void execute() throws Exception;

/@deprecated
public abstract ReturnT execute(String param) throws Exception;
/

/**

  • init handler, invoked when JobThread init

*/
public void init() throws Exception {
// do something
}

/**

  • destroy handler, invoked when JobThread destroy

*/
public void destroy() throws Exception {
// do something
}

}

sczxawaw

sczxawaw8#

package com.xxl.job.core.handler.impl;

import com.xxl.job.core.context.XxlJobHelper;
import com.xxl.job.core.handler.IJobHandler;

/**

  • glue job handler
  • @author xuxueli 2016-5-19 21:05:45

*/
public class GlueJobHandler extends IJobHandler {

private long glueUpdatetime;
private IJobHandler jobHandler;
public GlueJobHandler(IJobHandler jobHandler, long glueUpdatetime) {
this.jobHandler = jobHandler;
this.glueUpdatetime = glueUpdatetime;
}
public long getGlueUpdatetime() {
return glueUpdatetime;
}

@OverRide
public void execute() throws Exception {
XxlJobHelper.log("----------- glue.version:"+ glueUpdatetime +" -----------");
jobHandler.execute();
}

@OverRide
public void init() throws Exception {
this.jobHandler.init();
}

@OverRide
public void destroy() throws Exception {
this.jobHandler.destroy();
}
}

vq8itlhq

vq8itlhq9#

使用demo ,添加isNeedJobId =true,则传入jobId。默认不传jobId

@xxljob(value="demoJobHandler",init="init",isNeedJobId = true)
public void demoJobHandler() throws Exception {
XxlJobHelper.log("XXL-JOB, Hello World.");

for (int i = 0; i < 5; i++) {
        XxlJobHelper.log("beat at:" + i);
        TimeUnit.SECONDS.sleep(2);
    }
    // default success
}
iyr7buue

iyr7buue10#

内部封装了上下文
XxlJobHelper.getJobId()
但是只能执行调度方法才能调用.
/**

  • 1、简单任务示例(Bean模式)
    */
    @xxljob("demoJobHandler")
    public void demoJobHandler() throws Exception {
    XxlJobHelper.getJobId();
    }

相关问题