import java.lang.Thread.UncaughtExceptionHandler;
import java.util.Map;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;
import java.util.concurrent.atomic.AtomicReference;
import org.slf4j.MDC;
/**
* A {@link ForkJoinPool} that inherits MDC contexts from the thread that queues a task.
*
* @author Gili Tzabari
*/
public final class MdcForkJoinPool extends ForkJoinPool
{
/**
* Creates a new MdcForkJoinPool.
*
* @param parallelism the parallelism level. For default value, use {@link java.lang.Runtime#availableProcessors}.
* @param factory the factory for creating new threads. For default value, use
* {@link #defaultForkJoinWorkerThreadFactory}.
* @param handler the handler for internal worker threads that terminate due to unrecoverable errors encountered
* while executing tasks. For default value, use {@code null}.
* @param asyncMode if true, establishes local first-in-first-out scheduling mode for forked tasks that are never
* joined. This mode may be more appropriate than default locally stack-based mode in applications
* in which worker threads only process event-style asynchronous tasks. For default value, use
* {@code false}.
* @throws IllegalArgumentException if parallelism less than or equal to zero, or greater than implementation limit
* @throws NullPointerException if the factory is null
* @throws SecurityException if a security manager exists and the caller is not permitted to modify threads
* because it does not hold
* {@link java.lang.RuntimePermission}{@code ("modifyThread")}
*/
public MdcForkJoinPool(int parallelism, ForkJoinWorkerThreadFactory factory, UncaughtExceptionHandler handler,
boolean asyncMode)
{
super(parallelism, factory, handler, asyncMode);
}
@Override
public void execute(ForkJoinTask<?> task)
{
// See http://stackoverflow.com/a/19329668/14731
super.execute(wrap(task, MDC.getCopyOfContextMap()));
}
@Override
public void execute(Runnable task)
{
// See http://stackoverflow.com/a/19329668/14731
super.execute(wrap(task, MDC.getCopyOfContextMap()));
}
private <T> ForkJoinTask<T> wrap(ForkJoinTask<T> task, Map<String, String> newContext)
{
return new ForkJoinTask<T>()
{
private static final long serialVersionUID = 1L;
/**
* If non-null, overrides the value returned by the underlying task.
*/
private final AtomicReference<T> override = new AtomicReference<>();
@Override
public T getRawResult()
{
T result = override.get();
if (result != null)
return result;
return task.getRawResult();
}
@Override
protected void setRawResult(T value)
{
override.set(value);
}
@Override
protected boolean exec()
{
// According to ForkJoinTask.fork() "it is a usage error to fork a task more than once unless it has completed
// and been reinitialized". We therefore assume that this method does not have to be thread-safe.
Map<String, String> oldContext = beforeExecution(newContext);
try
{
task.invoke();
return true;
}
finally
{
afterExecution(oldContext);
}
}
};
}
private Runnable wrap(Runnable task, Map<String, String> newContext)
{
return () ->
{
Map<String, String> oldContext = beforeExecution(newContext);
try
{
task.run();
}
finally
{
afterExecution(oldContext);
}
};
}
/**
* Invoked before running a task.
*
* @param newValue the new MDC context
* @return the old MDC context
*/
private Map<String, String> beforeExecution(Map<String, String> newValue)
{
Map<String, String> previous = MDC.getCopyOfContextMap();
if (newValue == null)
MDC.clear();
else
MDC.setContextMap(newValue);
return previous;
}
/**
* Invoked after running a task.
*
* @param oldValue the old MDC context
*/
private void afterExecution(Map<String, String> oldValue)
{
if (oldValue == null)
MDC.clear();
else
MDC.setContextMap(oldValue);
}
}
和/或
import java.util.Map;
import java.util.concurrent.CountedCompleter;
import org.slf4j.MDC;
/**
* A {@link CountedCompleter} that inherits MDC contexts from the thread that queues a task.
*
* @author Gili Tzabari
* @param <T> The result type returned by this task's {@code get} method
*/
public abstract class MdcCountedCompleter<T> extends CountedCompleter<T>
{
private static final long serialVersionUID = 1L;
private final Map<String, String> newContext;
/**
* Creates a new MdcCountedCompleter instance using the MDC context of the current thread.
*/
protected MdcCountedCompleter()
{
this(null);
}
/**
* Creates a new MdcCountedCompleter instance using the MDC context of the current thread.
*
* @param completer this task's completer; {@code null} if none
*/
protected MdcCountedCompleter(CountedCompleter<?> completer)
{
super(completer);
this.newContext = MDC.getCopyOfContextMap();
}
/**
* The main computation performed by this task.
*/
protected abstract void computeWithContext();
@Override
public final void compute()
{
Map<String, String> oldContext = beforeExecution(newContext);
try
{
computeWithContext();
}
finally
{
afterExecution(oldContext);
}
}
/**
* Invoked before running a task.
*
* @param newValue the new MDC context
* @return the old MDC context
*/
private Map<String, String> beforeExecution(Map<String, String> newValue)
{
Map<String, String> previous = MDC.getCopyOfContextMap();
if (newValue == null)
MDC.clear();
else
MDC.setContextMap(newValue);
return previous;
}
/**
* Invoked after running a task.
*
* @param oldValue the old MDC context
*/
private void afterExecution(Map<String, String> oldValue)
{
if (oldValue == null)
MDC.clear();
else
MDC.setContextMap(oldValue);
}
}
4条答案
按热度按时间fv2wmkja1#
以下似乎对我有用:
和/或
1.在
MdcForkJoinPool
上运行任务,而不是在公共ForkJoinPool上运行。1.扩展
MdcCountedCompleter
而不是CountedCompleter
。zzlelutf2#
这里有一些额外的信息来沿着@Gili的回答。
显示解决方案有效的测试(注意,会有没有Context的行,但至少它们不会是错误的上下文,这是正常ForkJoinPool发生的情况)。
此外,这里还有一些在原始答案中应该被覆盖的其他方法。
7tofc5zh3#
我不熟悉
ForkJoinPool
,但您可以将感兴趣的MDC键/值传递给您示例化的ForkJoinTask
示例,然后将它们提交给ForkJoinPool
。从logback版本1.1.5开始,MDC值不由子线程继承,因此没有太多的选项。是的
1.在示例化
ForkJoinTask
示例时将相关的MDC键/值传递给它们1.扩展
ForkJoinPool
,以便将MDC键/值传递给新创建的线程1.创建自己的ThreadFactory,它将MDC键/值设置为新创建的线程
请注意,我实际上没有实现任何选项2。或3。
wecizke34#
我被同样的问题困住了。显然,每次需要运行并行Java流时都使用自定义ForkJoinPool并不理想,因为它需要大量代码。
然而,我想我找到了一个比主题创建者提出的更小的解决方案:
基本上,您只需要在某处拥有 mdcParallelStreamKeeper 方法并仅使用它。
UPDATE #1此方法中的MDC清理存在问题。