org.apache.qpid.proton.message.Message.setAddress()方法的使用及代码示例

x33g5p2x  于2022-01-25 转载在 其他  
字(10.5k)|赞(0)|评价(0)|浏览(144)

本文整理了Java中org.apache.qpid.proton.message.Message.setAddress()方法的一些代码示例,展示了Message.setAddress()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Message.setAddress()方法的具体详情如下:
包路径:org.apache.qpid.proton.message.Message
类名称:Message
方法名:setAddress

Message.setAddress介绍

暂无

代码示例

代码示例来源:origin: org.apache.qpid/proton-j-impl

private void restoreMessage(Message m)
{
  m.setAddress(_original);
}

代码示例来源:origin: com.microsoft.azure.iot/proton-j-azure-iot

private void restoreMessage(Message m)
{
  m.setAddress(_original);
}

代码示例来源:origin: io.vertx/vertx-proton

/**
 * Creates a Message object with the given String contained as an AmqpValue body, and the 'to' address set as given.
 *
 * @param address
 *          the 'to' address to set
 * @param body
 *          the string to set as an AmqpValue body
 * @return the message
 */
public static Message message(String address, String body) {
 Message value = message(body);
 value.setAddress(address);
 return value;
}

代码示例来源:origin: EnMasseProject/enmasse

public Future<Integer> sendMessages(String address, List<String> messages, Predicate<Message> predicate) {
  List<Message> messageList = messages.stream()
      .map(body -> {
        Message message = Message.Factory.create();
        message.setBody(new AmqpValue(body));
        message.setAddress(address);
        return message;
      })
      .collect(Collectors.toList());
  return sendMessages(address, messageList, predicate);
}

代码示例来源:origin: org.apache.qpid/proton-j-impl

private void rewriteMessage(Message m)
{
  _original = m.getAddress();
  if (_rewrites.apply(_original)) {
    m.setAddress(_rewrites.result());
  } else {
    m.setAddress(defaultRewrite(_original));
  }
}

代码示例来源:origin: com.microsoft.azure.iot/proton-j-azure-iot

private void rewriteMessage(Message m)
{
  _original = m.getAddress();
  if (_rewrites.apply(_original)) {
    m.setAddress(_rewrites.result());
  } else {
    m.setAddress(defaultRewrite(_original));
  }
}

代码示例来源:origin: stackoverflow.com

Messenger mng = Proton.messenger();
mng.start();
mng.subscribe("amqp://~xxx.xxx.xxx.xxx");

Message msg = Proton.message();
msg.setAddress("amqp://yyy.yyy.yyy.yyy");
....

代码示例来源:origin: eclipse/hono

private AmqpContext createContext(
    final ResourceIdentifier validatedAddress,
    final ProtonDelivery delivery,
    final Message message,
    final Device authenticatedDevice) {
  final String to = validatedAddress.toString();
  if (!to.equals(message.getAddress())) {
    LOG.debug("adjusting message's address [orig: {}, updated: {}]", message.getAddress(), to);
    message.setAddress(to);
  }
  return new AmqpContext(delivery, message, authenticatedDevice);
}

代码示例来源:origin: org.eclipse.hono/hono-client

@Override
public final Future<ProtonDelivery> send(final String deviceId, final Map<String, ?> properties, final byte[] payload, final String contentType,
             final String registrationAssertion) {
  Objects.requireNonNull(deviceId);
  Objects.requireNonNull(payload);
  Objects.requireNonNull(contentType);
  Objects.requireNonNull(registrationAssertion);
  final Message msg = ProtonHelper.message();
  msg.setAddress(getTo(deviceId));
  MessageHelper.setPayload(msg, contentType, payload);
  setApplicationProperties(msg, properties);
  addProperties(msg, deviceId, registrationAssertion);
  return send(msg);
}

代码示例来源:origin: apache/activemq-artemis

/**
* Sets the address which is applied to the AMQP message To field in the message properties
*
* @param address The address that should be applied in the Message To field.
*/
public void setAddress(String address) {
 checkReadOnly();
 lazyCreateProperties();
 getWrappedMessage().setAddress(address);
}

代码示例来源:origin: org.eclipse.hono/hono-client

@Override
@Deprecated
public final Future<ProtonDelivery> send(final String deviceId, final Map<String, ?> properties,
    final byte[] payload, final String contentType, final String registrationAssertion,
    final Handler<Void> capacityAvailableHandler) {
  Objects.requireNonNull(deviceId);
  Objects.requireNonNull(payload);
  Objects.requireNonNull(contentType);
  Objects.requireNonNull(registrationAssertion);
  final Message msg = ProtonHelper.message();
  msg.setAddress(getTo(deviceId));
  MessageHelper.setPayload(msg, contentType, payload);
  setApplicationProperties(msg, properties);
  addProperties(msg, deviceId, registrationAssertion);
  return send(msg, capacityAvailableHandler);
}

代码示例来源:origin: EnMasseProject/enmasse

@Override
public List<String> getQueueNames(AmqpClient queueClient, Destination replyQueue, String topic) throws Exception {
  Message requestMessage = Message.Factory.create();
  Map<String, Object> appProperties = new HashMap<>();
  appProperties.put(resourceProperty, "address." + topic);
  appProperties.put(operationProperty, "getQueueNames");
  requestMessage.setAddress(managementAddress);
  requestMessage.setApplicationProperties(new ApplicationProperties(appProperties));
  requestMessage.setReplyTo(replyQueue.getAddress());
  requestMessage.setBody(new AmqpValue("[]"));
  Future<Integer> sent = queueClient.sendMessages(managementAddress, requestMessage);
  assertThat(String.format("Sender failed, expected %d messages", 1), sent.get(30, TimeUnit.SECONDS), is(1));
  log.info("request sent");
  Future<List<Message>> received = queueClient.recvMessages(replyQueue.getAddress(), 1);
  assertThat(String.format("Receiver failed, expected %d messages", 1),
      received.get(30, TimeUnit.SECONDS).size(), is(1));
  AmqpValue val = (AmqpValue) received.get().get(0).getBody();
  log.info("answer received: " + val.toString());
  String queues = val.getValue().toString();
  queues = queues.replaceAll("\\[|]|\"", "");
  return Arrays.asList(queues.split(","));
}

代码示例来源:origin: org.eclipse.hono/hono-client

private static Message createResponseMessage(
    final String targetAddress,
    final String correlationId,
    final String contentType,
    final Buffer payload,
    final Map<String, Object> properties,
    final int status) {
  Objects.requireNonNull(targetAddress);
  Objects.requireNonNull(correlationId);
  final Message msg = ProtonHelper.message();
  msg.setCorrelationId(correlationId);
  msg.setAddress(targetAddress);
  MessageHelper.setPayload(msg, contentType, payload);
  if (properties != null) {
    msg.setApplicationProperties(new ApplicationProperties(properties));
  }
  MessageHelper.setCreationTime(msg);
  MessageHelper.addProperty(msg, MessageHelper.APP_PROPERTY_STATUS, status);
  return msg;
}

代码示例来源:origin: strimzi/strimzi-kafka-bridge

@Override
public Message toMessage(String address, KafkaConsumerRecord<String, byte[]> record) {
  
  Message message = Proton.message();
  message.setAddress(address);
  
  message.decode(record.value(), 0, record.value().length);
  
  // put message annotations about partition, offset and key (if not null)
  Map<Symbol, Object> map = new HashMap<>();
  map.put(Symbol.valueOf(AmqpBridge.AMQP_PARTITION_ANNOTATION), record.partition());
  map.put(Symbol.valueOf(AmqpBridge.AMQP_OFFSET_ANNOTATION), record.offset());
  map.put(Symbol.valueOf(AmqpBridge.AMQP_KEY_ANNOTATION), record.key());
  map.put(Symbol.valueOf(AmqpBridge.AMQP_TOPIC_ANNOTATION), record.topic());
  
  MessageAnnotations messageAnnotations = new MessageAnnotations(map);
  message.setMessageAnnotations(messageAnnotations);
  
  return message;
}

代码示例来源:origin: strimzi/strimzi-kafka-bridge

@Override
public Message toMessage(String address, KafkaConsumerRecord<String, byte[]> record) {
  
  Message message = Proton.message();
  message.setAddress(address);
  
  // put message annotations about partition, offset and key (if not null)
  Map<Symbol, Object> map = new HashMap<>();
  map.put(Symbol.valueOf(AmqpBridge.AMQP_PARTITION_ANNOTATION), record.partition());
  map.put(Symbol.valueOf(AmqpBridge.AMQP_OFFSET_ANNOTATION), record.offset());
  map.put(Symbol.valueOf(AmqpBridge.AMQP_KEY_ANNOTATION), record.key());
  map.put(Symbol.valueOf(AmqpBridge.AMQP_TOPIC_ANNOTATION), record.topic());
  
  MessageAnnotations messageAnnotations = new MessageAnnotations(map);
  message.setMessageAnnotations(messageAnnotations);
  
  message.setBody(new Data(new Binary(record.value())));
  
  return message;
}

代码示例来源:origin: EnMasseProject/enmasse

/**
 * Return a raw AMQP message
 *
 * @return
 */
public Message toAmqp() {
  Message message = ProtonHelper.message();
  message.setMessageId(this.messageId);
  Map<Symbol, Object> map = new HashMap<>();
  map.put(Symbol.valueOf(AMQP_RETAIN_ANNOTATION), this.isRetain);
  map.put(Symbol.valueOf(AMQP_QOS_ANNOTATION), this.qos.value());
  MessageAnnotations messageAnnotations = new MessageAnnotations(map);
  message.setMessageAnnotations(messageAnnotations);
  message.setAddress(this.topic);
  Header header = new Header();
  header.setDurable(this.qos != MqttQoS.AT_MOST_ONCE);
  message.setHeader(header);
  message.setDeliveryCount(this.isDup ? 1 : 0);
  // the payload could be null (or empty)
  if (this.payload != null)
    message.setBody(new Data(new Binary(this.payload.getBytes())));
  return message;
}

代码示例来源:origin: org.apache.beam/beam-sdks-java-io-amqp

@Test
 public void encodeDecodeLargeMessage() throws Exception {
  Message message = Message.Factory.create();
  message.setAddress("address");
  message.setSubject("subject");
  String body = Joiner.on("").join(Collections.nCopies(32 * 1024 * 1024, " "));
  message.setBody(new AmqpValue(body));

  AmqpMessageCoder coder = AmqpMessageCoder.of();

  Message clone = CoderUtils.clone(coder, message);

  clone.getBody().toString().equals(message.getBody().toString());
 }
}

代码示例来源:origin: org.apache.beam/beam-sdks-java-io-amqp

@Test
public void encodeDecodeTooMuchLargerMessage() throws Exception {
 thrown.expect(CoderException.class);
 Message message = Message.Factory.create();
 message.setAddress("address");
 message.setSubject("subject");
 String body = Joiner.on("").join(Collections.nCopies(64 * 1024 * 1024, " "));
 message.setBody(new AmqpValue(body));
 AmqpMessageCoder coder = AmqpMessageCoder.of();
 byte[] encoded = CoderUtils.encodeToByteArray(coder, message);
}

代码示例来源:origin: org.apache.beam/beam-sdks-java-io-amqp

@Test
public void encodeDecode() throws Exception {
 Message message = Message.Factory.create();
 message.setBody(new AmqpValue("body"));
 message.setAddress("address");
 message.setSubject("test");
 AmqpMessageCoder coder = AmqpMessageCoder.of();
 Message clone = CoderUtils.clone(coder, message);
 assertEquals("AmqpValue{body}", clone.getBody().toString());
 assertEquals("address", clone.getAddress());
 assertEquals("test", clone.getSubject());
}

代码示例来源:origin: org.apache.beam/beam-sdks-java-io-amqp

@Test
public void testRead() throws Exception {
 PCollection<Message> output =
   pipeline.apply(
     AmqpIO.read()
       .withMaxNumRecords(100)
       .withAddresses(Collections.singletonList(broker.getQueueUri("testRead"))));
 PAssert.thatSingleton(output.apply(Count.globally())).isEqualTo(100L);
 Messenger sender = Messenger.Factory.create();
 sender.start();
 for (int i = 0; i < 100; i++) {
  Message message = Message.Factory.create();
  message.setAddress(broker.getQueueUri("testRead"));
  message.setBody(new AmqpValue("Test " + i));
  sender.put(message);
  sender.send();
 }
 sender.stop();
 pipeline.run();
}

相关文章