io.vertx.core.eventbus.Message类的使用及代码示例

x33g5p2x  于2022-01-24 转载在 其他  
字(12.1k)|赞(0)|评价(0)|浏览(148)

本文整理了Java中io.vertx.core.eventbus.Message类的一些代码示例,展示了Message类的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Message类的具体详情如下:
包路径:io.vertx.core.eventbus.Message
类名称:Message

Message介绍

[英]Represents a message that is received from the event bus in a handler.

Messages have a #body, which can be null, and also #headers, which can be empty.

If the message was sent specifying a reply handler, it can be replied to using #reply.

If you want to notify the sender that processing failed, then #fail can be called.
[中]表示从处理程序中的事件总线接收的消息。
消息有#body(可以为null)和#header(可以为空)。
如果发送消息时指定了回复处理程序,则可以使用#reply进行回复。
如果要通知发送方处理失败,则可以调用#fail。

代码示例

代码示例来源:origin: vert-x3/vertx-examples

@GET
 @Path("/products")
 @Produces({MediaType.APPLICATION_JSON})
 public void list(

   // Suspend the request
   @Suspended final AsyncResponse asyncResponse,

   // Inject the Vertx instance
   @Context Vertx vertx) {

  // Send a list message to the backend
  vertx.eventBus().<JsonArray>send("backend", new JsonObject().put("op", "list"), msg -> {

   // When we get the response we resume the Jax-RS async response
   if (msg.succeeded()) {
    JsonArray json = msg.result().body();
    if (json != null) {
     asyncResponse.resume(json.encode());
    } else {
     asyncResponse.resume(Response.status(Response.Status.NOT_FOUND).build());
    }
   } else {
    asyncResponse.resume(Response.status(Response.Status.INTERNAL_SERVER_ERROR).build());
   }
  });
 }
}

代码示例来源:origin: eclipse-vertx/vert.x

@Test
public void testSendWithTimeoutNoTimeoutAfterReply() {
 String str = TestUtils.randomUnicodeString(1000);
 long timeout = 1000;
 eb.<String>consumer(ADDRESS1).handler((Message<String> msg) -> {
  assertEquals(str, msg.body());
  msg.reply("a reply");
 });
 AtomicBoolean received = new AtomicBoolean();
 eb.send(ADDRESS1, str, new DeliveryOptions().setSendTimeout(timeout), (AsyncResult<Message<Integer>> ar) -> {
  assertFalse(received.get());
  assertTrue(ar.succeeded());
  received.set(true);
  // Now wait longer than timeout and make sure we don't receive any other reply
  vertx.setTimer(timeout * 2, tid -> {
   testComplete();
  });
 });
 await();
}

代码示例来源:origin: eclipse-vertx/vert.x

MessageConsumer<?> reg = vertices[1].eventBus().consumer(ADDRESS1).handler(msg -> {
 assertEquals(str, msg.body());
 if (options == null) {
  msg.reply(val);
 } else {
  msg.reply(val, options);
reg.completionHandler(ar -> {
 assertTrue(ar.succeeded());
 vertices[0].eventBus().send(ADDRESS1, str, onSuccess((Message<R> reply) -> {
  if (consumer == null) {
   assertTrue(reply.isSend());
   assertEquals(received, reply.body());
   if (options != null && options.getHeaders() != null) {
    assertNotNull(reply.headers());
    assertEquals(options.getHeaders().size(), reply.headers().size());
    for (Map.Entry<String, String> entry: options.getHeaders().entries()) {
     assertEquals(reply.headers().get(entry.getKey()), entry.getValue());
   consumer.accept(reply.body());

代码示例来源:origin: vert-x3/vertx-examples

@Override
 public void start(Future<Void> startFuture) throws Exception {
  vertx.eventBus().<String>consumer("hello", message -> {
   message.reply("Hello " + message.body() + " from " + ID);
  }).completionHandler(startFuture);
 }
}

代码示例来源:origin: vert-x3/vertx-examples

private void registerHandler() {
    MessageConsumer<JsonObject> messageConsumer = eventBus.consumer(verticleAddress);
    messageConsumer.handler(message -> {
      JsonObject jsonMessage = message.body();
      System.out.println(jsonMessage.getValue("message_from_sender_verticle"));
      JsonObject jsonReply = new JsonObject().put("reply", "how interesting!");
      message.reply(jsonReply);
    });
  }
}

代码示例来源:origin: vert-x3/vertx-examples

private void placeOrder(Message<JsonObject> msg) {
 mongo.save("orders", msg.body(), save -> {
  // error handling
  if (save.failed()) {
   msg.fail(500, save.cause().getMessage());
   return;
  }
  msg.reply(new JsonObject());
 });
}

代码示例来源:origin: eclipse-vertx/vert.x

@Test
public void testChangesNotVisibleObject3() {
 Map<String, Object> map = new HashMap<>();
 final JsonObject obj = new JsonObject(map);
 eb.<JsonObject>consumer("foo").handler((Message<JsonObject> msg) -> {
  vertx.setTimer(1000, id -> {
   assertFalse(msg.body().containsKey("b"));
   testComplete();
  });
 });
 eb.send("foo", obj);
 map.put("b", "uhqdihuqwd");
 await();
}

代码示例来源:origin: vert-x3/vertx-web

private void testSend(String address, Object body, boolean headers) throws Exception {
 CountDownLatch latch = new CountDownLatch(1);
 client.websocket(websocketURI, ws -> {
  MessageConsumer<Object> consumer = vertx.eventBus().consumer(address);
  consumer.handler(msg -> {
   Object receivedBody = msg.body();
   assertEquals(body, receivedBody);
   if (headers) {
    checkHeaders(msg);
   }
   consumer.unregister(v -> latch.countDown());
  });
  JsonObject msg = new JsonObject().put("type", "send").put("address", address).put("body", body);
  ws.writeFrame(io.vertx.core.http.WebSocketFrame.textFrame(msg.encode(), true));
 });
 awaitLatch(latch);
}

代码示例来源:origin: xenv/gushici

private void handleRoot(RoutingContext routingContext) {
  JsonObject result = new JsonObject();
  result.put("welcome", "欢迎使用古诗词·一言");
  result.put("api-document", "下面为本API可用的所有类型,使用时,在链接最后面加上 .svg / .txt / .json / .png 可以获得不同格式的输出");
  result.put("help", "具体安装方法请访问项目首页 " + config().getString("index.url", "http://localhost/"));
  vertx.eventBus().<JsonArray>send(Key.GET_HELP_FROM_REDIS, null, res -> {
    if (res.succeeded()) {
      result.put("list", res.result().body());
      returnJsonWithCache(routingContext, result);
    } else {
      routingContext.fail(res.cause());
    }
  });
}

代码示例来源:origin: vert-x3/vertx-examples

private void sendMessage() {
    JsonObject jsonMessage = new JsonObject().put("message_from_sender_verticle", "hello consumer");
    eventBus.send("Consumer", jsonMessage, messageAsyncResult -> {
      if(messageAsyncResult.succeeded()) {
        JsonObject jsonReply = (JsonObject) messageAsyncResult.result().body();
        System.out.println("received reply: " + jsonReply.getValue("reply"));
      }
    });
  }
}

代码示例来源:origin: vert-x3/vertx-examples

@Override
 public void start(Future<Void> startFuture) throws Exception {
  HttpServerOptions options = new HttpServerOptions().setPort(config().getInteger("port"));
  vertx.createHttpServer(options).requestHandler(request -> {
   String name = request.getParam("name");
   if (name == null) {
    request.response().setStatusCode(400).end("Missing name");
   } else {
    vertx.eventBus().<String>send("hello", name, ar -> {
     if (ar.succeeded()) {
      request.response().end(ar.result().body());
     } else {
      request.response().setStatusCode(500).end(ar.cause().getMessage());
     }
    });
   }
  }).listen(ar -> {
   if (ar.succeeded()) {
    startFuture.complete();
   } else {
    startFuture.fail(ar.cause());
   }
  });
 }
}

代码示例来源:origin: eclipse-vertx/vert.x

private void testIsolationGroup(String group1, String group2, int count1, int count2, List<String> isolatedClasses,
                String verticleID) throws Exception {
 Map<String, Integer> countMap = new ConcurrentHashMap<>();
 vertx.eventBus().<JsonObject>consumer("testcounts").handler((Message<JsonObject> msg) -> {
  countMap.put(msg.body().getString("deploymentID"), msg.body().getInteger("count"));
 });
 CountDownLatch latch = new CountDownLatch(1);
 boolean expectedSuccess = Thread.currentThread().getContextClassLoader() instanceof URLClassLoader;
 try {
  vertx.deployVerticle(verticleID, new DeploymentOptions().
   setIsolationGroup(group1).setIsolatedClasses(isolatedClasses), ar -> {
   assertTrue(ar.succeeded());
   deploymentID1.set(ar.result());
   assertEquals(0, TestVerticle.instanceCount.get());
   vertx.deployVerticle(verticleID,
    new DeploymentOptions().setIsolationGroup(group2).setIsolatedClasses(isolatedClasses), ar2 -> {
    assertTrue(ar2.succeeded());
    deploymentID2.set(ar2.result());
    assertEquals(0, TestVerticle.instanceCount.get());

代码示例来源:origin: eclipse-vertx/vert.x

@Test
public void testSendWithTimeoutNoReply() {
 String str = TestUtils.randomUnicodeString(1000);
 eb.<String>consumer(ADDRESS1).handler((Message<String> msg) -> {
  assertEquals(str, msg.body());
 });
 long timeout = 1000;
 long start = System.currentTimeMillis();
 eb.send(ADDRESS1, str, new DeliveryOptions().setSendTimeout(timeout), (AsyncResult<Message<Integer>> ar) -> {
  long now = System.currentTimeMillis();
  assertFalse(ar.succeeded());
  Throwable cause = ar.cause();
  assertTrue(cause instanceof ReplyException);
  ReplyException re = (ReplyException) cause;
  assertEquals(-1, re.failureCode());
  assertEquals(ReplyFailure.TIMEOUT, re.failureType());
  assertTrue(now - start >= timeout);
  testComplete();
 });
 await();
}

代码示例来源:origin: xenv/gushici

private void getHelpFromRedis(Message message) {
  redisClient.lrange(Key.REDIS_HELP_LIST, 0, -1, res -> {
    if (res.succeeded()) {
      JsonArray array = res.result();
      JsonArray newArray = array.stream()
        .map(text -> {
          String prefix = config().getString("api.url", "http://localhost/");
          return new JsonObject((String) text).stream()
            .collect(Collectors.toMap(Map.Entry::getKey,
              v -> prefix + v.getValue().toString().replace(":", "/")));
        })
        .collect(JsonCollector.toJsonArray());
      message.reply(newArray);
    } else {
      log.error("Fail to get data from Redis", res.cause());
      message.fail(500, res.cause().getMessage());
    }
  });
}

代码示例来源:origin: vert-x3/vertx-examples

private void listAlbums(Message<JsonObject> msg) {
 // issue a find command to mongo to fetch all documents from the "albums" collection.
 mongo.find("albums", new JsonObject(), lookup -> {
  // error handling
  if (lookup.failed()) {
   msg.fail(500, lookup.cause().getMessage());
   return;
  }
  // now convert the list to a JsonArray because it will be easier to encode the final object as the response.
  final JsonArray json = new JsonArray();
  for (JsonObject o : lookup.result()) {
   json.add(o);
  }
  msg.reply(json);
 });
}

代码示例来源:origin: eclipse-vertx/vert.x

@Test
public void testRegisterRemote1() {
 startNodes(2);
 String str = TestUtils.randomUnicodeString(100);
 vertices[0].eventBus().<String>consumer(ADDRESS1).handler((Message<String> msg) -> {
  assertEquals(str, msg.body());
  testComplete();
 }).completionHandler(ar -> {
  assertTrue(ar.succeeded());
  vertices[1].eventBus().send(ADDRESS1, str);
 });
 await();
}

代码示例来源:origin: xenv/gushici

/**
 * @param message example: {format: "png", categories: [shenghuo, buyi]}
 */
private void getGushiciFromRedis(Message<JsonObject> message) {
  JsonArray realCategory = new JsonArray()
    .add("png".equals(message.body().getString("format")) ? "img" : "json")
    .addAll(message.body().getJsonArray("categories"));
  checkAndGetKey(realCategory)
    .compose(key -> Future.<String>future(s -> redisClient.srandmember(key, s))) // 从 set 随机返回一个对象
    .setHandler(res -> {
      if (res.succeeded()) {
        message.reply(res.result());
      } else {
        if (res.cause() instanceof ReplyException) {
          ReplyException exception = (ReplyException) res.cause();
          message.fail(exception.failureCode(), exception.getMessage());
        }
        message.fail(500, res.cause().getMessage());
      }
    });
}

代码示例来源:origin: eclipse-vertx/vert.x

@Test
public void testReplyToSendWithNoReplyHandler() {
 eb.<String>consumer(ADDRESS1).handler((Message<String> msg) -> {
  msg.reply("a reply");
  testComplete();
 });
 eb.send(ADDRESS1, "whatever");
 await();
}

代码示例来源:origin: eclipse-vertx/vert.x

@Test
public void testSendWithTimeoutNoTimeoutReply() {
 String str = TestUtils.randomUnicodeString(1000);
 eb.<String>consumer(ADDRESS1).handler((Message<String> msg) -> {
  assertEquals(str, msg.body());
  msg.reply(23);
 });
 long timeout = 1000;
 eb.send(ADDRESS1, str, new DeliveryOptions().setSendTimeout(timeout), (AsyncResult<Message<Integer>> ar) -> {
  assertTrue(ar.succeeded());
  assertEquals(23, (int) ar.result().body());
  testComplete();
 });
 await();
}

代码示例来源:origin: vert-x3/vertx-examples

@Override
public void start() throws Exception {
 // A simple backend
 vertx.eventBus().<JsonObject>consumer("backend", msg -> {
  JsonObject json = msg.body();
  switch (json.getString("op", "")) {
   case "get": {
    String productID = json.getString("id");
    msg.reply(products.get(productID));
    break;
   }
   case "add": {
    String productID = json.getString("id");
    JsonObject product = json.getJsonObject("product");
    product.put("id", productID);
    msg.reply(addProduct(product));
    break;
   }
   case "list": {
    JsonArray arr = new JsonArray();
    products.forEach((k, v) -> arr.add(v));
    msg.reply(arr);
    break;
   }
   default: {
    msg.fail(0, "operation not permitted");
   }
  }
 });
}

相关文章