io.vertx.core.eventbus.Message.reply()方法的使用及代码示例

x33g5p2x  于2022-01-24 转载在 其他  
字(9.6k)|赞(0)|评价(0)|浏览(150)

本文整理了Java中io.vertx.core.eventbus.Message.reply()方法的一些代码示例,展示了Message.reply()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Message.reply()方法的具体详情如下:
包路径:io.vertx.core.eventbus.Message
类名称:Message
方法名:reply

Message.reply介绍

[英]Reply to this message.

If the message was sent specifying a reply handler, that handler will be called when it has received a reply. If the message wasn't sent specifying a receipt handler this method does nothing.
[中]回复此消息。
如果发送消息时指定了回复处理程序,则在收到回复时将调用该处理程序。如果未发送指定收据处理程序的消息,则此方法不会执行任何操作。

代码示例

代码示例来源:origin: vert-x3/vertx-examples

@Override
 public void start() throws Exception {
  System.out.println("[Worker] Starting in " + Thread.currentThread().getName());

  vertx.eventBus().<String>consumer("sample.data", message -> {
   System.out.println("[Worker] Consuming data in " + Thread.currentThread().getName());
   String body = message.body();
   message.reply(body.toUpperCase());
  });
 }
}

代码示例来源:origin: vert-x3/vertx-examples

private void placeOrder(Message<JsonObject> msg) {
 mongo.save("orders", msg.body(), save -> {
  // error handling
  if (save.failed()) {
   msg.fail(500, save.cause().getMessage());
   return;
  }
  msg.reply(new JsonObject());
 });
}

代码示例来源:origin: vert-x3/vertx-examples

@Override
 public void start() {
  vertx.eventBus().<String>consumer("request", message -> message.reply("hello " + message.body()));

  vertx.createHttpServer()
    .requestHandler(request -> request.response().end("OK"))
    .listen(8080);
 }
}

代码示例来源:origin: vert-x3/vertx-examples

@Override
 public void start() throws Exception {

  EventBus eb = vertx.eventBus();

  eb.consumer("ping-address", message -> {

   System.out.println("Received message: " + message.body());
   // Now send back reply
   message.reply("pong!");
  });

  System.out.println("Receiver ready!");
 }
}

代码示例来源:origin: vert-x3/vertx-examples

private void registerHandler() {
    MessageConsumer<JsonObject> messageConsumer = eventBus.consumer(verticleAddress);
    messageConsumer.handler(message -> {
      JsonObject jsonMessage = message.body();
      System.out.println(jsonMessage.getValue("message_from_sender_verticle"));
      JsonObject jsonReply = new JsonObject().put("reply", "how interesting!");
      message.reply(jsonReply);
    });
  }
}

代码示例来源:origin: eclipse-vertx/vert.x

@Override
 public void start() throws Exception {
  vertices[1].eventBus().<String>consumer(ADDRESS1, msg -> {
   msg.reply(expectedBody);
  }).completionHandler(ar -> {
   assertTrue(ar.succeeded());
   latch.countDown();
  });
 }
}, new DeploymentOptions().setWorker(true));

代码示例来源:origin: vert-x3/vertx-examples

@Override
 public void start(Future<Void> startFuture) throws Exception {
  vertx.eventBus().<String>consumer("hello", message -> {
   message.reply("Hello " + message.body() + " from " + ID);
  }).completionHandler(startFuture);
 }
}

代码示例来源:origin: eclipse-vertx/vert.x

@Override
 public void start(Future<Void> startFuture) throws Exception {
  EventBus eventBus = getVertx().eventBus();
  MessageConsumer<String> consumer = eventBus.consumer("whatever");
  consumer.handler(m -> {
   if (!unregisterCalled) {
    consumer.unregister(v -> unregistered.set(true));
    unregisterCalled = true;
   }
   m.reply("ok");
  }).completionHandler(startFuture);
 }
};

代码示例来源:origin: eclipse-vertx/vert.x

@Test
public void testContextsSend() throws Exception {
 Set<ContextInternal> contexts = new ConcurrentHashSet<>();
 CountDownLatch latch = new CountDownLatch(2);
 vertx.eventBus().consumer(ADDRESS1).handler(msg -> {
  msg.reply("bar");
  contexts.add(((VertxInternal) vertx).getContext());
  latch.countDown();
 });
 vertx.eventBus().send(ADDRESS1, "foo", onSuccess((Message<Object> reply) -> {
  assertEquals("bar", reply.body());
  contexts.add(((VertxInternal) vertx).getContext());
  latch.countDown();
 }));
 awaitLatch(latch);
 assertEquals(2, contexts.size());
}

代码示例来源:origin: eclipse-vertx/vert.x

@Test
public void testReplyFailureTimeout2() throws Exception {
 CountDownLatch latch = new CountDownLatch(1);
 EventBus eb = vertx.eventBus();
 eb.consumer(ADDRESS1, msg -> {
  msg.reply("juu", new DeliveryOptions().setSendTimeout(10), ar -> {
   assertTrue(ar.failed());
   latch.countDown();
  });
 });
 eb.send(ADDRESS1, "bar", ar -> {
  // Do not reply
 });
 awaitLatch(latch);
 FakeEventBusMetrics metrics = FakeMetricsBase.getMetrics(eb);
 assertEquals(1, metrics.getReplyFailureAddresses().size());
 assertEquals(Collections.singletonList(ReplyFailure.TIMEOUT), metrics.getReplyFailures());
}

代码示例来源:origin: eclipse-vertx/vert.x

@Test
public void testReplyToPublish() {
 eb.<String>consumer(ADDRESS1).handler((Message<String> msg) -> {
  msg.reply("a reply");
  testComplete();
 });
 eb.publish(ADDRESS1, "whatever");
 await();
}

代码示例来源:origin: eclipse-vertx/vert.x

@Test
public void testReplyToSendWithNoReplyHandler() {
 eb.<String>consumer(ADDRESS1).handler((Message<String> msg) -> {
  msg.reply("a reply");
  testComplete();
 });
 eb.send(ADDRESS1, "whatever");
 await();
}

代码示例来源:origin: eclipse-vertx/vert.x

@Test
public void testSendWithTimeoutNoTimeoutAfterReply() {
 String str = TestUtils.randomUnicodeString(1000);
 long timeout = 1000;
 eb.<String>consumer(ADDRESS1).handler((Message<String> msg) -> {
  assertEquals(str, msg.body());
  msg.reply("a reply");
 });
 AtomicBoolean received = new AtomicBoolean();
 eb.send(ADDRESS1, str, new DeliveryOptions().setSendTimeout(timeout), (AsyncResult<Message<Integer>> ar) -> {
  assertFalse(received.get());
  assertTrue(ar.succeeded());
  received.set(true);
  // Now wait longer than timeout and make sure we don't receive any other reply
  vertx.setTimer(timeout * 2, tid -> {
   testComplete();
  });
 });
 await();
}

代码示例来源:origin: eclipse-vertx/vert.x

@Test
public void testSendWithTimeoutNoTimeoutReply() {
 String str = TestUtils.randomUnicodeString(1000);
 eb.<String>consumer(ADDRESS1).handler((Message<String> msg) -> {
  assertEquals(str, msg.body());
  msg.reply(23);
 });
 long timeout = 1000;
 eb.send(ADDRESS1, str, new DeliveryOptions().setSendTimeout(timeout), (AsyncResult<Message<Integer>> ar) -> {
  assertTrue(ar.succeeded());
  assertEquals(23, (int) ar.result().body());
  testComplete();
 });
 await();
}

代码示例来源:origin: eclipse-vertx/vert.x

@Override
public void start() throws Exception {
 JsonObject config = config();
 id = config.getInteger("id");
 numAddresses = config.getInteger("addressesCount");
 List<Future> registrationFutures = new ArrayList<>(numAddresses);
 for (int i = 0; i < numAddresses; i++) {
  Future<Void> registrationFuture = Future.future();
  registrationFutures.add(registrationFuture);
  vertx.eventBus().consumer(createAddress(id, i), msg -> msg.reply("pong")).completionHandler(registrationFuture.completer());
 }
 Future<Void> registrationFuture = Future.future();
 registrationFutures.add(registrationFuture);
 vertx.eventBus().consumer("ping", this::ping).completionHandler(registrationFuture.completer());
 CompositeFuture.all(registrationFutures).setHandler(ar -> {
  if (ar.succeeded()) {
   vertx.eventBus().send("control", "start");
  }
 });
}

代码示例来源:origin: eclipse-vertx/vert.x

@Test
public void testSendReplyWithTimeoutNoTimeout() {
 String str = TestUtils.randomUnicodeString(1000);
 String reply = TestUtils.randomUnicodeString(1000);
 String replyReply = TestUtils.randomUnicodeString(1000);
 eb.<String>consumer(ADDRESS1).handler((Message<String> msg) -> {
  assertEquals(str, msg.body());
  long timeout = 1000;
  msg.reply(reply, new DeliveryOptions().setSendTimeout(timeout), ar -> {
   assertTrue(ar.succeeded());
   assertEquals(replyReply, ar.result().body());
   testComplete();
  });
 });
 eb.send(ADDRESS1, str, onSuccess((Message<String>msg) -> {
  assertEquals(reply, msg.body());
  msg.reply(replyReply);
 }));
 await();
}

代码示例来源:origin: eclipse-vertx/vert.x

@Test
public void testReplyToReply() {
 String str = TestUtils.randomUnicodeString(1000);
 String reply = TestUtils.randomUnicodeString(1000);
 String replyReply = TestUtils.randomUnicodeString(1000);
 eb.<String>consumer(ADDRESS1).handler((Message<String> msg) -> {
  assertEquals(str, msg.body());
  msg.reply(reply, onSuccess((Message<String> rep) -> {
   assertEquals(replyReply, rep.body());
   testComplete();
  }));
 });
 eb.send(ADDRESS1, str, onSuccess((Message<String>msg) -> {
  assertEquals(reply, msg.body());
  msg.reply(replyReply);
 }));
 await();
}

代码示例来源:origin: eclipse-vertx/vert.x

@Test
public void testSendWithTimeoutReplyAfterTimeout() {
 String str = TestUtils.randomUnicodeString(1000);
 long timeout = 1000;
 eb.<String>consumer(ADDRESS1).handler((Message<String> msg) -> {
  assertEquals(str, msg.body());
  vertx.setTimer((int)(timeout * 1.5), id -> {
   msg.reply("too late!");
  });
 });
 eb.send(ADDRESS1, str, new DeliveryOptions().setSendTimeout(timeout), (AsyncResult<Message<Integer>> ar) -> {
  assertFalse(ar.succeeded());
  Throwable cause = ar.cause();
  assertTrue(cause instanceof ReplyException);
  ReplyException re = (ReplyException) cause;
  assertEquals(-1, re.failureCode());
  assertEquals(ReplyFailure.TIMEOUT, re.failureType());
  testComplete();
 });
 await();
}

代码示例来源:origin: eclipse-vertx/vert.x

@Test
public void testSendWithReply() {
 String str = TestUtils.randomUnicodeString(1000);
 String reply = TestUtils.randomUnicodeString(1000);
 eb.<String>consumer(ADDRESS1).handler((Message<String> msg) -> {
  assertEquals(str, msg.body());
  msg.reply(reply);
 });
 eb.send(ADDRESS1, str, onSuccess((Message<String> msg) -> {
  assertEquals(reply, msg.body());
  testComplete();
 }));
 await();
}

代码示例来源:origin: eclipse-vertx/vert.x

@Test
 public void testLocalOnlyDoesNotApplyToReplies() {
  startNodes(2);
  vertices[1].eventBus().consumer(ADDRESS1).handler(msg -> {
   msg.reply("pong", new DeliveryOptions().setLocalOnly(true));
  }).completionHandler(onSuccess(v -> {
   vertices[0].eventBus().send(ADDRESS1, "ping", new DeliveryOptions().setSendTimeout(500), onSuccess(msg -> testComplete()));
  }));
  await();
 }
}

相关文章