本文整理了Java中io.vertx.core.eventbus.Message.reply()
方法的一些代码示例,展示了Message.reply()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Message.reply()
方法的具体详情如下:
包路径:io.vertx.core.eventbus.Message
类名称:Message
方法名:reply
[英]Reply to this message.
If the message was sent specifying a reply handler, that handler will be called when it has received a reply. If the message wasn't sent specifying a receipt handler this method does nothing.
[中]回复此消息。
如果发送消息时指定了回复处理程序,则在收到回复时将调用该处理程序。如果未发送指定收据处理程序的消息,则此方法不会执行任何操作。
代码示例来源:origin: vert-x3/vertx-examples
@Override
public void start() throws Exception {
System.out.println("[Worker] Starting in " + Thread.currentThread().getName());
vertx.eventBus().<String>consumer("sample.data", message -> {
System.out.println("[Worker] Consuming data in " + Thread.currentThread().getName());
String body = message.body();
message.reply(body.toUpperCase());
});
}
}
代码示例来源:origin: vert-x3/vertx-examples
private void placeOrder(Message<JsonObject> msg) {
mongo.save("orders", msg.body(), save -> {
// error handling
if (save.failed()) {
msg.fail(500, save.cause().getMessage());
return;
}
msg.reply(new JsonObject());
});
}
代码示例来源:origin: vert-x3/vertx-examples
@Override
public void start() {
vertx.eventBus().<String>consumer("request", message -> message.reply("hello " + message.body()));
vertx.createHttpServer()
.requestHandler(request -> request.response().end("OK"))
.listen(8080);
}
}
代码示例来源:origin: vert-x3/vertx-examples
@Override
public void start() throws Exception {
EventBus eb = vertx.eventBus();
eb.consumer("ping-address", message -> {
System.out.println("Received message: " + message.body());
// Now send back reply
message.reply("pong!");
});
System.out.println("Receiver ready!");
}
}
代码示例来源:origin: vert-x3/vertx-examples
private void registerHandler() {
MessageConsumer<JsonObject> messageConsumer = eventBus.consumer(verticleAddress);
messageConsumer.handler(message -> {
JsonObject jsonMessage = message.body();
System.out.println(jsonMessage.getValue("message_from_sender_verticle"));
JsonObject jsonReply = new JsonObject().put("reply", "how interesting!");
message.reply(jsonReply);
});
}
}
代码示例来源:origin: eclipse-vertx/vert.x
@Override
public void start() throws Exception {
vertices[1].eventBus().<String>consumer(ADDRESS1, msg -> {
msg.reply(expectedBody);
}).completionHandler(ar -> {
assertTrue(ar.succeeded());
latch.countDown();
});
}
}, new DeploymentOptions().setWorker(true));
代码示例来源:origin: vert-x3/vertx-examples
@Override
public void start(Future<Void> startFuture) throws Exception {
vertx.eventBus().<String>consumer("hello", message -> {
message.reply("Hello " + message.body() + " from " + ID);
}).completionHandler(startFuture);
}
}
代码示例来源:origin: eclipse-vertx/vert.x
@Override
public void start(Future<Void> startFuture) throws Exception {
EventBus eventBus = getVertx().eventBus();
MessageConsumer<String> consumer = eventBus.consumer("whatever");
consumer.handler(m -> {
if (!unregisterCalled) {
consumer.unregister(v -> unregistered.set(true));
unregisterCalled = true;
}
m.reply("ok");
}).completionHandler(startFuture);
}
};
代码示例来源:origin: eclipse-vertx/vert.x
@Test
public void testContextsSend() throws Exception {
Set<ContextInternal> contexts = new ConcurrentHashSet<>();
CountDownLatch latch = new CountDownLatch(2);
vertx.eventBus().consumer(ADDRESS1).handler(msg -> {
msg.reply("bar");
contexts.add(((VertxInternal) vertx).getContext());
latch.countDown();
});
vertx.eventBus().send(ADDRESS1, "foo", onSuccess((Message<Object> reply) -> {
assertEquals("bar", reply.body());
contexts.add(((VertxInternal) vertx).getContext());
latch.countDown();
}));
awaitLatch(latch);
assertEquals(2, contexts.size());
}
代码示例来源:origin: eclipse-vertx/vert.x
@Test
public void testReplyFailureTimeout2() throws Exception {
CountDownLatch latch = new CountDownLatch(1);
EventBus eb = vertx.eventBus();
eb.consumer(ADDRESS1, msg -> {
msg.reply("juu", new DeliveryOptions().setSendTimeout(10), ar -> {
assertTrue(ar.failed());
latch.countDown();
});
});
eb.send(ADDRESS1, "bar", ar -> {
// Do not reply
});
awaitLatch(latch);
FakeEventBusMetrics metrics = FakeMetricsBase.getMetrics(eb);
assertEquals(1, metrics.getReplyFailureAddresses().size());
assertEquals(Collections.singletonList(ReplyFailure.TIMEOUT), metrics.getReplyFailures());
}
代码示例来源:origin: eclipse-vertx/vert.x
@Test
public void testReplyToPublish() {
eb.<String>consumer(ADDRESS1).handler((Message<String> msg) -> {
msg.reply("a reply");
testComplete();
});
eb.publish(ADDRESS1, "whatever");
await();
}
代码示例来源:origin: eclipse-vertx/vert.x
@Test
public void testReplyToSendWithNoReplyHandler() {
eb.<String>consumer(ADDRESS1).handler((Message<String> msg) -> {
msg.reply("a reply");
testComplete();
});
eb.send(ADDRESS1, "whatever");
await();
}
代码示例来源:origin: eclipse-vertx/vert.x
@Test
public void testSendWithTimeoutNoTimeoutAfterReply() {
String str = TestUtils.randomUnicodeString(1000);
long timeout = 1000;
eb.<String>consumer(ADDRESS1).handler((Message<String> msg) -> {
assertEquals(str, msg.body());
msg.reply("a reply");
});
AtomicBoolean received = new AtomicBoolean();
eb.send(ADDRESS1, str, new DeliveryOptions().setSendTimeout(timeout), (AsyncResult<Message<Integer>> ar) -> {
assertFalse(received.get());
assertTrue(ar.succeeded());
received.set(true);
// Now wait longer than timeout and make sure we don't receive any other reply
vertx.setTimer(timeout * 2, tid -> {
testComplete();
});
});
await();
}
代码示例来源:origin: eclipse-vertx/vert.x
@Test
public void testSendWithTimeoutNoTimeoutReply() {
String str = TestUtils.randomUnicodeString(1000);
eb.<String>consumer(ADDRESS1).handler((Message<String> msg) -> {
assertEquals(str, msg.body());
msg.reply(23);
});
long timeout = 1000;
eb.send(ADDRESS1, str, new DeliveryOptions().setSendTimeout(timeout), (AsyncResult<Message<Integer>> ar) -> {
assertTrue(ar.succeeded());
assertEquals(23, (int) ar.result().body());
testComplete();
});
await();
}
代码示例来源:origin: eclipse-vertx/vert.x
@Override
public void start() throws Exception {
JsonObject config = config();
id = config.getInteger("id");
numAddresses = config.getInteger("addressesCount");
List<Future> registrationFutures = new ArrayList<>(numAddresses);
for (int i = 0; i < numAddresses; i++) {
Future<Void> registrationFuture = Future.future();
registrationFutures.add(registrationFuture);
vertx.eventBus().consumer(createAddress(id, i), msg -> msg.reply("pong")).completionHandler(registrationFuture.completer());
}
Future<Void> registrationFuture = Future.future();
registrationFutures.add(registrationFuture);
vertx.eventBus().consumer("ping", this::ping).completionHandler(registrationFuture.completer());
CompositeFuture.all(registrationFutures).setHandler(ar -> {
if (ar.succeeded()) {
vertx.eventBus().send("control", "start");
}
});
}
代码示例来源:origin: eclipse-vertx/vert.x
@Test
public void testSendReplyWithTimeoutNoTimeout() {
String str = TestUtils.randomUnicodeString(1000);
String reply = TestUtils.randomUnicodeString(1000);
String replyReply = TestUtils.randomUnicodeString(1000);
eb.<String>consumer(ADDRESS1).handler((Message<String> msg) -> {
assertEquals(str, msg.body());
long timeout = 1000;
msg.reply(reply, new DeliveryOptions().setSendTimeout(timeout), ar -> {
assertTrue(ar.succeeded());
assertEquals(replyReply, ar.result().body());
testComplete();
});
});
eb.send(ADDRESS1, str, onSuccess((Message<String>msg) -> {
assertEquals(reply, msg.body());
msg.reply(replyReply);
}));
await();
}
代码示例来源:origin: eclipse-vertx/vert.x
@Test
public void testReplyToReply() {
String str = TestUtils.randomUnicodeString(1000);
String reply = TestUtils.randomUnicodeString(1000);
String replyReply = TestUtils.randomUnicodeString(1000);
eb.<String>consumer(ADDRESS1).handler((Message<String> msg) -> {
assertEquals(str, msg.body());
msg.reply(reply, onSuccess((Message<String> rep) -> {
assertEquals(replyReply, rep.body());
testComplete();
}));
});
eb.send(ADDRESS1, str, onSuccess((Message<String>msg) -> {
assertEquals(reply, msg.body());
msg.reply(replyReply);
}));
await();
}
代码示例来源:origin: eclipse-vertx/vert.x
@Test
public void testSendWithTimeoutReplyAfterTimeout() {
String str = TestUtils.randomUnicodeString(1000);
long timeout = 1000;
eb.<String>consumer(ADDRESS1).handler((Message<String> msg) -> {
assertEquals(str, msg.body());
vertx.setTimer((int)(timeout * 1.5), id -> {
msg.reply("too late!");
});
});
eb.send(ADDRESS1, str, new DeliveryOptions().setSendTimeout(timeout), (AsyncResult<Message<Integer>> ar) -> {
assertFalse(ar.succeeded());
Throwable cause = ar.cause();
assertTrue(cause instanceof ReplyException);
ReplyException re = (ReplyException) cause;
assertEquals(-1, re.failureCode());
assertEquals(ReplyFailure.TIMEOUT, re.failureType());
testComplete();
});
await();
}
代码示例来源:origin: eclipse-vertx/vert.x
@Test
public void testSendWithReply() {
String str = TestUtils.randomUnicodeString(1000);
String reply = TestUtils.randomUnicodeString(1000);
eb.<String>consumer(ADDRESS1).handler((Message<String> msg) -> {
assertEquals(str, msg.body());
msg.reply(reply);
});
eb.send(ADDRESS1, str, onSuccess((Message<String> msg) -> {
assertEquals(reply, msg.body());
testComplete();
}));
await();
}
代码示例来源:origin: eclipse-vertx/vert.x
@Test
public void testLocalOnlyDoesNotApplyToReplies() {
startNodes(2);
vertices[1].eventBus().consumer(ADDRESS1).handler(msg -> {
msg.reply("pong", new DeliveryOptions().setLocalOnly(true));
}).completionHandler(onSuccess(v -> {
vertices[0].eventBus().send(ADDRESS1, "ping", new DeliveryOptions().setSendTimeout(500), onSuccess(msg -> testComplete()));
}));
await();
}
}
内容来源于网络,如有侵权,请联系作者删除!