io.vertx.core.Context类的使用及代码示例

x33g5p2x  于2022-01-18 转载在 其他  
字(15.0k)|赞(0)|评价(0)|浏览(175)

本文整理了Java中io.vertx.core.Context类的一些代码示例,展示了Context类的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Context类的具体详情如下:
包路径:io.vertx.core.Context
类名称:Context

Context介绍

[英]The execution context of a io.vertx.core.Handler execution.

When Vert.x provides an event to a handler or calls the start or stop methods of a io.vertx.core.Verticle, the execution is associated with a Context.

Usually a context is an event-loop context and is tied to a specific event loop thread. So executions for that context always occur on that exact same event loop thread.

In the case of worker verticles and running inline blocking code a worker context will be associated with the execution which will use a thread from the worker thread pool.

When a handler is set by a thread associated with a specific context, the Vert.x will guarantee that when that handler is executed, that execution will be associated with the same context.

If a handler is set by a thread not associated with a context (i.e. a non Vert.x thread). Then a new context will be created for that handler.

In other words, a context is propagated.

This means that when a verticle is deployed, any handlers it sets will be associated with the same context - the context of the verticle.

This means (in the case of a standard verticle) that the verticle code will always be executed with the exact same thread, so you don't have to worry about multi-threaded acccess to the verticle state and you can code your application as single threaded.

This class also allows arbitrary data to be #put and #get on the context so it can be shared easily amongst different handlers of, for example, a verticle instance.

This class also provides #runOnContext which allows an action to be executed asynchronously using the same context.
[中]io的执行上下文。维特斯。果心处理程序执行。
当垂直。x向处理程序提供事件,或调用io的启动或停止方法。维特斯。果心垂直,执行与上下文关联。
通常,上下文是一个事件循环上下文,并绑定到特定的事件循环线程。因此,该上下文的执行总是发生在完全相同的事件循环线程上。
在工作线程垂直并运行内联阻塞代码的情况下,工作线程上下文将与使用工作线程池中的线程的执行相关联。
当处理程序由与特定上下文关联的线程设置时,Vert。x将保证在执行该处理程序时,该执行将与相同的上下文相关联。
如果处理程序是由与上下文无关的线程(即非Vert.x线程)设置的。然后将为该处理程序创建一个新上下文。
换句话说,一个上下文被传播。
这意味着,当部署垂直链接时,它设置的任何处理程序都将与相同的上下文(垂直链接的上下文)关联。
这意味着(在标准verticle的情况下)verticle代码将始终使用完全相同的线程执行,因此您不必担心多线程访问verticle状态,您可以将应用程序编码为单线程。
该类还允许在上下文中放置和获取任意数据,以便在不同的处理程序(例如verticle实例)之间轻松共享。
此类还提供了#runOnContext,允许使用相同的上下文异步执行操作。

代码示例

代码示例来源:origin: eclipse-vertx/vert.x

public void undeployAll(Handler<AsyncResult<Void>> completionHandler) {
 // TODO timeout if it takes too long - e.g. async stop verticle fails to call future
 // We only deploy the top level verticles as the children will be undeployed when the parent is
 Set<String> deploymentIDs = new HashSet<>();
 for (Map.Entry<String, Deployment> entry: deployments.entrySet()) {
  if (!entry.getValue().isChild()) {
   deploymentIDs.add(entry.getKey());
  }
 }
 if (!deploymentIDs.isEmpty()) {
  AtomicInteger count = new AtomicInteger(0);
  for (String deploymentID : deploymentIDs) {
   undeployVerticle(deploymentID, ar -> {
    if (ar.failed()) {
     // Log but carry on regardless
     log.error("Undeploy failed", ar.cause());
    }
    if (count.incrementAndGet() == deploymentIDs.size()) {
     completionHandler.handle(Future.succeededFuture());
    }
   });
  }
 } else {
  Context context = vertx.getOrCreateContext();
  context.runOnContext(v -> completionHandler.handle(Future.succeededFuture()));
 }
}

代码示例来源:origin: eclipse-vertx/vert.x

@Override
 public void start() throws Exception {
  processArgs = context.processArgs();
  conf = context.config();
//    if (Thread.currentThread().getContextClassLoader() != getClass().getClassLoader()) {
//      throw new IllegalStateException("Wrong tccl!");
//    }
  vertx.eventBus().send("testcounts",
   new JsonObject().put("deploymentID", context.deploymentID()).put("count", instanceCount.incrementAndGet()));
 }

代码示例来源:origin: eclipse-vertx/vert.x

@Override
public HttpServerFileUpload streamToFileSystem(String filename) {
 pause();
 context.owner().fileSystem().open(filename, new OpenOptions(), ar -> {
  if (ar.succeeded()) {
   file =  ar.result();
   Pump p = Pump.pump(HttpServerFileUploadImpl.this, ar.result());
   p.start();
   resume();
  } else {
   notifyExceptionHandler(ar.cause());
  }
 });
 return this;
}

代码示例来源:origin: eclipse-vertx/vert.x

void acquireLock() {
  if (status.compareAndSet(Status.WAITING, Status.ACQUIRED)) {
   if (timerId != null) {
    context.owner().cancelTimer(timerId);
   }
   context.runOnContext(v -> handler.handle(Future.succeededFuture(new AsyncLock(lockName))));
  } else {
   context.runOnContext(v -> nextWaiter(lockName));
  }
 }
}

代码示例来源:origin: eclipse-vertx/vert.x

@Override
 public void start() throws Exception {
  thread.set(Thread.currentThread());
  assertTrue(Context.isOnVertxThread());
  assertTrue(Context.isOnWorkerThread());
  assertFalse(Context.isOnEventLoopThread());
  assertTrue(Thread.currentThread().getName().startsWith(poolName + "-"));
  context.runOnContext(v -> {
   vertx.undeploy(context.deploymentID());
  });
 }
}, new DeploymentOptions().setWorker(true).setWorkerPoolName(poolName), onSuccess(deployment::set));

代码示例来源:origin: eclipse-vertx/vert.x

@Test
public void testNetSocketStreamCallbackIsAsync() {
 this.server = vertx.createNetServer(new NetServerOptions());
 AtomicInteger done = new AtomicInteger();
 ReadStream<NetSocket> stream = server.connectStream();
 stream.handler(req -> {});
 stack.set(true);
 stream.endHandler(v -> {
  assertTrue(Vertx.currentContext().isEventLoopContext());
  assertNull(stack.get());
  if (done.incrementAndGet() == 2) {
   testComplete();
  assertTrue(Vertx.currentContext().isEventLoopContext());
  assertNull(stack.get());
  ThreadLocal<Object> stack2 = new ThreadLocal<>();
  stack2.set(true);
  server.close(v -> {
   assertTrue(Vertx.currentContext().isEventLoopContext());
   assertNull(stack2.get());
   if (done.incrementAndGet() == 2) {
    testComplete();

代码示例来源:origin: eclipse-vertx/vert.x

CompletableFuture<Void> done = new CompletableFuture<>();
AtomicBoolean paused = new AtomicBoolean();
AtomicInteger numPause = new AtomicInteger();
server.requestHandler(req -> {
 Context ctx = vertx.getOrCreateContext();
 done.thenAccept(v1 -> {
  paused.set(false);
  ctx.runOnContext(v2 -> {
   req.resume();
  });
 });
 numPause.incrementAndGet();
 req.pause();
 paused.set(true);
 fail();
});
AtomicInteger sent = new AtomicInteger();
AtomicInteger count = new AtomicInteger();
AtomicInteger drained = new AtomicInteger();
vertx.setPeriodic(1, timerID -> {
 Context ctx = vertx.getOrCreateContext();
 if (req.writeQueueFull()) {
  assertTrue(paused.get());

代码示例来源:origin: eclipse-vertx/vert.x

@Test
public void testMultipleServerClose() {
 this.server = vertx.createHttpServer(new HttpServerOptions().setPort(DEFAULT_HTTP_PORT));
 AtomicInteger times = new AtomicInteger();
 // We assume the endHandler and the close completion handler are invoked in the same context task
 ThreadLocal stack = new ThreadLocal();
 stack.set(true);
 server.websocketStream().endHandler(v -> {
  assertNull(stack.get());
  assertTrue(Vertx.currentContext().isEventLoopContext());
  times.incrementAndGet();
 });
 server.close(ar1 -> {
  assertNull(stack.get());
  assertTrue(Vertx.currentContext().isEventLoopContext());
  server.close(ar2 -> {
   server.close(ar3 -> {
    assertEquals(1, times.get());
    testComplete();
   });
  });
 });
 await();
}

代码示例来源:origin: eclipse-vertx/vert.x

@Test
public void testMultipleServerClose() {
 this.server = vertx.createHttpServer(new HttpServerOptions().setPort(DEFAULT_HTTP_PORT));
 AtomicInteger times = new AtomicInteger();
 // We assume the endHandler and the close completion handler are invoked in the same context task
 ThreadLocal stack = new ThreadLocal();
 stack.set(true);
 server.requestStream().endHandler(v -> {
  assertNull(stack.get());
  assertTrue(Vertx.currentContext().isEventLoopContext());
  times.incrementAndGet();
 });
 server.close(ar1 -> {
  assertNull(stack.get());
  assertTrue(Vertx.currentContext().isEventLoopContext());
  server.close(ar2 -> {
   server.close(ar3 -> {
    assertEquals(1, times.get());
    testComplete();
   });
  });
 });
 await();
}

代码示例来源:origin: eclipse-vertx/vert.x

@Test
public void testCheckThatPauseAfterResumeWontDoAnyEmission() {
 context.runOnContext(v1 -> {
  buffer = new InboundBuffer<>(context, 4L);
  AtomicInteger emitted = new AtomicInteger();
  buffer.handler(elt -> emitted.incrementAndGet());
  buffer.pause();
  fill();
  // Resume will execute an asynchronous drain operation
  buffer.resume();
  // Pause just after to ensure that no elements will be delivered to he handler
  buffer.pause();
  // Give enough time to have elements delivered
  vertx.setTimer(20, id -> {
   // Check we haven't received anything
   assertEquals(0, emitted.get());
   testComplete();
  });
 });
 await();
}

代码示例来源:origin: eclipse-vertx/vert.x

int contentLength = numBuffers * chunkSize;
AtomicReference<HttpServerMetric> serverMetric = new AtomicReference<>();
server.requestHandler(req -> {
 assertEquals(protocol, req.version());
 FakeHttpServerMetrics serverMetrics = FakeMetricsBase.getMetrics(server);
});
startServer();
CountDownLatch latch = new CountDownLatch(1);
AtomicReference<HttpClientMetric> clientMetric = new AtomicReference<>();
FakeHttpClientMetrics metrics = FakeMetricsBase.getMetrics(client);
Context ctx = vertx.getOrCreateContext();
ctx.runOnContext(v -> {
 assertEquals(Collections.emptySet(), metrics.endpoints());
 HttpClientRequest req = client.get(DEFAULT_HTTP_PORT, DEFAULT_HTTP_HOST, "/somepath",
    assertNull(metrics.getMetric(resp.request()));
    assertEquals(contentLength, buff.length());
    latch.countDown();
   });
  }))

代码示例来源:origin: eclipse-vertx/vert.x

CountDownLatch requestBeginLatch = new CountDownLatch(1);
CountDownLatch requestBodyLatch = new CountDownLatch(1);
CountDownLatch requestEndLatch = new CountDownLatch(1);
CompletableFuture<Void> beginResponse = new CompletableFuture<>();
CompletableFuture<Void> endResponse = new CompletableFuture<>();
server.requestHandler(req -> {
 Context ctx = vertx.getOrCreateContext();
 beginResponse.thenAccept(v1 -> {
  ctx.runOnContext(v2 -> {
   req.response().setChunked(true).write(TestUtils.randomAlphaString(1024));
  });
 });
 endResponse.thenAccept(v1 -> {
  ctx.runOnContext(v2 -> {
   req.response().end();
  });
server.listen(8080, "localhost", onSuccess(s -> { listenLatch.countDown(); }));
awaitLatch(listenLatch);
FakeHttpClientMetrics clientMetrics = FakeMetricsBase.getMetrics(client);

代码示例来源:origin: eclipse-vertx/vert.x

@Test
public void testWebsocketStreamCallbackAsynchronously() {
 this.server = vertx.createHttpServer(new HttpServerOptions().setPort(DEFAULT_HTTP_PORT));
 AtomicInteger done = new AtomicInteger();
 ReadStream<ServerWebSocket> stream = server.websocketStream();
 stream.handler(req -> { });
 ThreadLocal<Object> stack = new ThreadLocal<>();
 stack.set(true);
 stream.endHandler(v -> {
  assertTrue(Vertx.currentContext().isEventLoopContext());
  assertNull(stack.get());
  if (done.incrementAndGet() == 2) {
   testComplete();
 server.listen(ar -> {
  assertTrue(Vertx.currentContext().isEventLoopContext());
  assertNull(stack.get());
  ThreadLocal<Object> stack2 = new ThreadLocal<>();
  stack2.set(true);
  server.close(v -> {
   assertTrue(Vertx.currentContext().isEventLoopContext());
   assertNull(stack2.get());
   if (done.incrementAndGet() == 2) {
    testComplete();

代码示例来源:origin: eclipse-vertx/vert.x

@Test
public void testCloseServerAsynchronously() {
 this.server = vertx.createHttpServer(new HttpServerOptions().setPort(HttpTestBase.DEFAULT_HTTP_PORT));
 AtomicInteger done = new AtomicInteger();
 ReadStream<HttpServerRequest> stream = server.requestStream();
 stream.handler(req -> {});
 ThreadLocal<Object> stack = new ThreadLocal<>();
 stack.set(true);
 stream.endHandler(v -> {
  assertTrue(Vertx.currentContext().isEventLoopContext());
  assertNull(stack.get());
  if (done.incrementAndGet() == 2) {
   testComplete();
 server.listen(ar -> {
  assertTrue(Vertx.currentContext().isEventLoopContext());
  assertNull(stack.get());
  ThreadLocal<Object> stack2 = new ThreadLocal<>();
  stack2.set(true);
  server.close(v -> {
   assertTrue(Vertx.currentContext().isEventLoopContext());
   assertNull(stack2.get());
   if (done.incrementAndGet() == 2) {
    testComplete();

代码示例来源:origin: eclipse-vertx/vert.x

int size = getOptions().getWorkerPoolSize();
List<Context> workers = createWorkers(size + 1);
CountDownLatch latch1 = new CountDownLatch(workers.size() - 1);
workers.get(0).runOnContext(v -> {
 NetServer server = vertx.createNetServer();
 server.connectHandler(so -> {
  so.handler(buf -> {
 });
 server.listen(testAddress, ar -> {
  assertTrue(ar.succeeded());
   workers.get(i).runOnContext(v2 -> {
    latch1.countDown();
    try {
     Thread.sleep(1000);
});
awaitLatch(latch1);
NetClient client = vertx.createNetClient();
client.connect(testAddress, ar -> {
 assertTrue(ar.succeeded());
 NetSocket so = ar.result();
 so.write(Buffer.buffer("hello"));
});

代码示例来源:origin: eclipse-vertx/vert.x

@Test
public void testAcquireDifferentLocksOnSameEventLoop() {
 Vertx vertx = getVertx();
 Context context = vertx.getOrCreateContext();
 SharedData sharedData = vertx.sharedData();
 AtomicInteger stage = new AtomicInteger();
 context.runOnContext(v -> {
  sharedData.getLock("foo", onSuccess(foo -> {
   assertTrue(stage.compareAndSet(0, 1));
   // Create another lock request
   sharedData.getLock("foo", onSuccess(foo1 -> {
    assertEquals(2, stage.get());
    foo1.release();
    testComplete();
   }));
   // Should not be blocked by second request for lock "foo"
   sharedData.getLock("bar", onSuccess(bar -> {
    assertTrue(stage.compareAndSet(1, 2));
    foo.release();
    bar.release();
   }));
  }));
 });
 await();
}

代码示例来源:origin: eclipse-vertx/vert.x

@Test
public void testOrdered() {
 String poolName = "vert.x-" + TestUtils.randomAlphaString(10);
 WorkerExecutor worker = vertx.createSharedWorkerExecutor(poolName);
 int num = 1000;
 AtomicReference<Thread> t = new AtomicReference<>();
 CountDownLatch submitted = new CountDownLatch(1);
 Context ctx = vertx.getOrCreateContext();
 ctx.runOnContext(v -> {
  for (int i = 0;i < num;i++) {
   boolean first = i == 0;
    fut.complete(null);
   }, ar -> {
    if (last) {
  submitted.countDown();
 });
 await();

代码示例来源:origin: eclipse-vertx/vert.x

CountDownLatch latch = new CountDownLatch(1);
Vertx vertx = vertx(new VertxOptions().setMetricsOptions(new MetricsOptions().setEnabled(true).setFactory(factory)));
Context ctx = contextFactory.apply(vertx);
ctx.runOnContext(v1 -> {
 NetServer server = vertx.createNetServer().connectHandler(so -> {
  so.handler(buf -> {
   so.write("bye");
 server.listen(1234, "localhost", onSuccess(s -> {
  expectedThread.set(Thread.currentThread());
  expectedContext.set(Vertx.currentContext());
  checker.accept(expectedThread.get(), expectedContext.get());
  latch.countDown();
 }));
});
awaitLatch(latch);
NetClient client = vertx.createNetClient();
client.connect(1234, "localhost", onSuccess(so -> {
 so.handler(buf -> {

代码示例来源:origin: eclipse-vertx/vert.x

@Test
public void testContextExceptionHandlerFailing() {
 RuntimeException failure = new RuntimeException();
 Context context = vertx.getOrCreateContext();
 AtomicInteger count = new AtomicInteger();
 context.exceptionHandler(err -> {
  if (count.getAndIncrement() == 0) {
   throw new RuntimeException();
  } else {
   assertSame(failure, err);
   testComplete();
  }
 });
 context.runOnContext(v -> {
  throw new RuntimeException();
 });
 context.runOnContext(v -> {
  throw failure;
 });
 await();
}

代码示例来源:origin: eclipse-vertx/vert.x

io.vertx.core.http.Http2Settings expectedSettings = TestUtils.randomHttp2Settings();
expectedSettings.setHeaderTableSize((int)io.vertx.core.http.Http2Settings.DEFAULT_HEADER_TABLE_SIZE);
server.close();
server = vertx.createHttpServer(serverOptions);
Context otherContext = vertx.getOrCreateContext();
server.connectionHandler(conn -> {
 otherContext.runOnContext(v -> {
  conn.updateSettings(expectedSettings);
 });
});
server.requestHandler(req -> {
});
startServer();
AtomicInteger count = new AtomicInteger();
client.get(DEFAULT_HTTPS_PORT, DEFAULT_HTTPS_HOST, "/somepath", onFailure(resp -> {})).connectionHandler(conn -> {
 conn.remoteSettingsHandler(settings -> {
  switch (count.getAndIncrement()) {
   case 0:
    assertEquals(expectedSettings.getMaxHeaderListSize(), settings.getMaxHeaderListSize());

相关文章