java.util.concurrent.Executors类的使用及代码示例

x33g5p2x  于2022-01-17 转载在 其他  
字(12.1k)|赞(0)|评价(0)|浏览(193)

本文整理了Java中java.util.concurrent.Executors类的一些代码示例,展示了Executors类的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Executors类的具体详情如下:
包路径:java.util.concurrent.Executors
类名称:Executors

Executors介绍

[英]Factory and utility methods for Executor, ExecutorService, ScheduledExecutorService, ThreadFactory, and Callable classes defined in this package. This class supports the following kinds of methods:

  • Methods that create and return an ExecutorServiceset up with commonly useful configuration settings.
  • Methods that create and return a ScheduledExecutorServiceset up with commonly useful configuration settings.
  • Methods that create and return a "wrapped" ExecutorService, that disables reconfiguration by making implementation-specific methods inaccessible.
  • Methods that create and return a ThreadFactorythat sets newly created threads to a known state.
  • Methods that create and return a Callableout of other closure-like forms, so they can be used in execution methods requiring Callable.
    [中]此包中定义的Executor、ExecutorService、ScheduledExecutorService、ThreadFactory和可调用类的工厂和实用程序方法。此类支持以下类型的方法:
    *创建并返回ExecutorServiceset的方法,该方法使用常用的配置设置进行设置。
    *方法,这些方法创建并返回ScheduledExecutorServiceset,该ScheduledExecutorServiceset使用常用的配置设置进行设置。
    *方法创建并返回“包装的”ExecutorService,该服务通过使特定于实现的方法不可访问来禁用重新配置。
    *方法,该方法创建并返回一个ThreadFactory,该ThreadFactory将新创建的线程设置为已知状态。
    *方法,这些方法创建并返回其他类似闭包的表单的Callableout,因此可以在需要Callable的执行方法中使用。

代码示例

代码示例来源:origin: stackoverflow.com

final ExecutorService pool = Executors.newFixedThreadPool(2);
final CompletionService<String> service = new ExecutorCompletionService<String>(pool);
final List<? extends Callable<String>> callables = Arrays.asList(
  new SleepingCallable("slow", 5000),
  new SleepingCallable("quick", 500));
for (final Callable<String> callable : callables) {
 service.submit(callable);
}
pool.shutdown();
try {
 while (!pool.isTerminated()) {
  final Future<String> future = service.take();
  System.out.println(future.get());
 }
} catch (ExecutionException | InterruptedException ex) { }

代码示例来源:origin: skylot/jadx

public synchronized Future<Boolean> process() {
  if (future != null) {
    return future;
  }
  ExecutorService shutdownExecutor = Executors.newSingleThreadExecutor();
  FutureTask<Boolean> task = new ShutdownTask();
  shutdownExecutor.execute(task);
  shutdownExecutor.shutdown();
  future = task;
  return future;
}

代码示例来源:origin: stackoverflow.com

ExecutorService es = Executors.newCachedThreadPool();
for(int i=0;i<5;i++)
  es.execute(new Runnable() { /*  your task */ });
es.shutdown();
boolean finshed = es.awaitTermination(1, TimeUnit.MINUTES);
// all tasks have finished or the time has been reached.

代码示例来源:origin: stackoverflow.com

ExecutorService exec = Executors.newFixedThreadPool(4,
    new ThreadFactory() {
      public Thread newThread(Runnable r) {
        Thread t = Executors.defaultThreadFactory().newThread(r);
        t.setDaemon(true);
        return t;
      }
    });

exec.execute(YourTaskNowWillBeDaemon);

代码示例来源:origin: ReactiveX/RxJava

final int NUM_RETRIES = Flowable.bufferSize() * 2;
int ncpu = Runtime.getRuntime().availableProcessors();
ExecutorService exec = Executors.newFixedThreadPool(Math.max(ncpu / 2, 2));
try {
  for (int r = 0; r < NUM_LOOPS; r++) {
    final AtomicInteger timeouts = new AtomicInteger();
    final Map<Integer, List<String>> data = new ConcurrentHashMap<Integer, List<String>>();
    final CountDownLatch cdl = new CountDownLatch(m);
    for (int i = 0; i < m; i++) {
      final int j = i;
      exec.execute(new Runnable() {
        @Override
        public void run() {
    cdl.await();
    assertEquals(0, timeouts.get());
    if (data.size() > 0) {
      fail("Data content mismatch: " + allSequenceFrequency(data));
  exec.shutdown();

代码示例来源:origin: apache/zookeeper

@Test(expected = SSLHandshakeException.class)
public void testClientRenegotiationFails() throws Throwable {
  int port = PortAssignment.unique();
  ExecutorService workerPool = Executors.newCachedThreadPool();
  final SSLServerSocket listeningSocket = x509Util.createSSLServerSocket();
  SSLSocket clientSocket = null;
  SSLSocket serverSocket = null;
  final AtomicInteger handshakesCompleted = new AtomicInteger(0);
  try {
    InetSocketAddress localServerAddress = new InetSocketAddress(
    listeningSocket.bind(localServerAddress);
    Future<SSLSocket> acceptFuture;
    acceptFuture = workerPool.submit(new Callable<SSLSocket>() {
      @Override
      public SSLSocket call() throws Exception {
      serverSocket = acceptFuture.get();
    } catch (ExecutionException e) {
      throw e.getCause();
    forceClose(clientSocket);
    forceClose(listeningSocket);
    workerPool.shutdown();
    Assert.assertEquals(1, handshakesCompleted.get());

代码示例来源:origin: spring-projects/spring-framework

@Test
public void nonSharedEngine() throws Exception {
  int iterations = 20;
  this.view.setEngineName("nashorn");
  this.view.setRenderFunction("render");
  this.view.setSharedEngine(false);
  this.view.setApplicationContext(this.context);
  ExecutorService executor = Executors.newFixedThreadPool(4);
  List<Future<Boolean>> results = new ArrayList<>();
  for (int i = 0; i < iterations; i++) {
    results.add(executor.submit(() -> view.getEngine() != null));
  }
  assertEquals(iterations, results.size());
  for (int i = 0; i < iterations; i++) {
    assertTrue(results.get(i).get());
  }
  executor.shutdown();
}

代码示例来源:origin: google/guava

@GwtIncompatible // Threads
public void testTransformAsync_functionToString() throws Exception {
 final CountDownLatch functionCalled = new CountDownLatch(1);
 final CountDownLatch functionBlocking = new CountDownLatch(1);
 AsyncFunction<Object, Object> function =
   new AsyncFunction<Object, Object>() {
    @Override
    public ListenableFuture<Object> apply(Object input) throws Exception {
     functionCalled.countDown();
     functionBlocking.await();
     return immediateFuture(null);
    }
    @Override
    public String toString() {
     return "Called my toString";
    }
   };
 ExecutorService executor = Executors.newSingleThreadExecutor();
 try {
  ListenableFuture<?> output =
    Futures.transformAsync(immediateFuture(null), function, executor);
  functionCalled.await();
  assertThat(output.toString()).contains("Called my toString");
 } finally {
  functionBlocking.countDown();
  executor.shutdown();
 }
}

代码示例来源:origin: org.apache.logging.log4j/log4j-core

@Test
  public void testInheritableThreadContextImmutability() throws Throwable {
    prepareThreadContext(true);
    try {
      newSingleThreadExecutor().submit(new Runnable() {
        @Override
        public void run() {
          testContextDataInjector();
        }
      }).get();
    } catch (ExecutionException ee) {
      throw ee.getCause();
    }
  }
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void disposeRace() {
  ExecutorService exec = Executors.newSingleThreadExecutor();
  final Scheduler s = Schedulers.from(exec, true);
  try {
    for (int i = 0; i < 500; i++) {
      final Worker w = s.createWorker();
      final AtomicInteger c = new AtomicInteger(2);
      w.schedule(new Runnable() {
        @Override
        public void run() {
          c.decrementAndGet();
          while (c.get() != 0) { }
        }
      });
      c.decrementAndGet();
      while (c.get() != 0) { }
      w.dispose();
    }
  } finally {
    exec.shutdownNow();
  }
}

代码示例来源:origin: openzipkin/brave

@Test public void isnt_inheritable() throws Exception {
 ExecutorService service = Executors.newCachedThreadPool();
 try (Scope scope = currentTraceContext.newScope(context)) {
  assertThat(service.submit(() -> {
   verifyImplicitContext(null);
   return currentTraceContext.get();
  }).get()).isNull();
 } catch (ExecutionException e) {
  if (e.getCause() instanceof Error) throw (Error) e.getCause();
  throw (Exception) e.getCause();
 }
 assertThat(service.submit(currentTraceContext::get).get())
   .isNull();
 verifyImplicitContext(null);
 service.shutdownNow();
}

代码示例来源:origin: google/guava

int tasksPerThread = 10;
int nTasks = nThreads * tasksPerThread;
ExecutorService pool = Executors.newFixedThreadPool(nThreads);
ImmutableList<String> keys = ImmutableList.of("a", "b", "c");
try {
 List<Future<int[]>> futures = Lists.newArrayListWithExpectedSize(nTasks);
 for (int i = 0; i < nTasks; i++) {
  futures.add(pool.submit(new MutateTask(multiset, keys)));
  int[] taskDeltas = future.get();
  for (int i = 0; i < deltas.length; i++) {
   deltas[i] += taskDeltas[i];
 assertEquals("Counts not as expected", Ints.asList(deltas), actualCounts);
} finally {
 pool.shutdownNow();
 assertTrue("map should not contain a zero", value.get() != 0);

代码示例来源:origin: google/guava

final ExecutorService executor = Executors.newFixedThreadPool(numThreads);
for (int i = 0; i < 1000; i++) {
 final AtomicInteger counter = new AtomicInteger();
 final TrustedListenableFutureTask<Integer> task =
   TrustedListenableFutureTask.create(
  executor.execute(wrapper);
 assertEquals(1, counter.get());
executor.shutdown();

代码示例来源:origin: ReactiveX/RxJava

@Test(timeout = 30000)
public void testIssue2890NoStackoverflow() throws InterruptedException {
  final ExecutorService executor = Executors.newFixedThreadPool(2);
  final Scheduler sch = Schedulers.from(executor);
  final AtomicInteger counter = new AtomicInteger();
  executor.awaitTermination(20000, TimeUnit.MILLISECONDS);
  assertEquals(n, counter.get());

代码示例来源:origin: google/guava

public void testEnqueueAndDispatch_multithreaded() throws InterruptedException {
 Object listener = new Object();
 ExecutorService service = Executors.newFixedThreadPool(4);
 ListenerCallQueue<Object> queue = new ListenerCallQueue<>();
 try {
  queue.addListener(listener, service);
  final CountDownLatch latch = new CountDownLatch(1);
  Multiset<Object> counters = ConcurrentHashMultiset.create();
  queue.enqueue(incrementingEvent(counters, listener, 1));
  queue.enqueue(incrementingEvent(counters, listener, 2));
  queue.enqueue(incrementingEvent(counters, listener, 3));
  queue.enqueue(incrementingEvent(counters, listener, 4));
  queue.enqueue(countDownEvent(latch));
  assertEquals(0, counters.size());
  queue.dispatch();
  latch.await();
  assertEquals(multiset(listener, 4), counters);
 } finally {
  service.shutdown();
 }
}

代码示例来源:origin: thinkaurelius/titan

ExecutorService es = Executors.newFixedThreadPool(CONCURRENCY);
      maxIterations, idAuthority, ids);
  uids.add(idAuthority.getUniqueID());
  futures.add(es.submit(stressRunnable));
    f.get();
  } catch (ExecutionException e) {
    throw e.getCause();
es.shutdownNow();

代码示例来源:origin: apache/incubator-gobblin

configuration.getClass(StressTestUtils.STRESSOR_CLASS, StressTestUtils.DEFAULT_STRESSOR_CLASS, Stressor.class);
ExecutorService executorService = Executors.newFixedThreadPool(stressorThreads);
ThrottlingPolicy policy = (ThrottlingPolicy) broker.getSharedResource(new ThrottlingPolicyFactory(),
  new SharedLimiterKey(resourceLimited));
ScheduledExecutorService reportingThread = Executors.newSingleThreadScheduledExecutor();
reportingThread.scheduleAtFixedRate(new Reporter(limiterContainer, policy), 0, 15, TimeUnit.SECONDS);
 futures.add(executorService.submit(new StressorRunner(limiterContainer.decorateLimiter(restliLimiter),
   stressor)));
for (Future<?> future : futures) {
 try {
  future.get();
 } catch (ExecutionException ee) {
  stressorFailures++;
executorService.shutdownNow();

代码示例来源:origin: google/guava

public void testRejectedExecutionThrownWithMultipleCalls() throws Exception {
 final CountDownLatch latch = new CountDownLatch(1);
 final SettableFuture<?> future = SettableFuture.create();
 final Executor delegate =
 final ExecutorService blocked = Executors.newCachedThreadPool();
 Future<?> first =
   blocked.submit(
     new Runnable() {
      @Override
 } catch (RejectedExecutionException expected) {
 latch.countDown();
 try {
  first.get(10, TimeUnit.SECONDS);
  fail();
 } catch (ExecutionException expected) {

代码示例来源:origin: OpenHFT/Chronicle-Queue

ExecutorService service1 = Executors.newSingleThreadExecutor();
ScheduledExecutorService service2 = null;
try {
  Future f = service1.submit(() -> {
    final ExcerptAppender appender = queue.acquireAppender();
  service2 = Executors.newSingleThreadScheduledExecutor();
  service2.scheduleAtFixedRate(() -> {
    Bytes b = Bytes.elasticHeapByteBuffer(128);
    final ExcerptTailer tailer = queue.createTailer();
    if (bytes == null) {
      f.get(1, TimeUnit.SECONDS);
      throw new NullPointerException("nothing in result");
    f.get(1, TimeUnit.SECONDS);
  } finally {
    bytes.release();
  service1.shutdownNow();
  if (service2 != null)
    service2.shutdownNow();

代码示例来源:origin: reactor/reactor-core

@Test
public void scanSupportBuffered() throws InterruptedException {
  Executor plain = Runnable::run;
  ExecutorService plainService = Executors.newSingleThreadExecutor();
  ExecutorService threadPool = Executors.newFixedThreadPool(3);
  ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(4);
        .as("plainService").isEqualTo(null);
    scheduledThreadPool.schedule(() -> {}, 500, TimeUnit.MILLISECONDS);
    scheduledThreadPool.schedule(() -> {}, 500, TimeUnit.MILLISECONDS);
    Thread.sleep(50); //give some leeway for the pool to have consistent accounting
        .as("scheduledThreadPool").isEqualTo(2);
    threadPool.submit(() -> {
      try { Thread.sleep(200); } catch (InterruptedException e) { e.printStackTrace(); }
    });
    plainService.shutdownNow();
    unsupportedScheduledExecutorService.shutdownNow();
    threadPool.shutdownNow();
    scheduledThreadPool.shutdownNow();

相关文章