本文整理了Java中org.apache.qpid.proton.message.Message.setAddress()
方法的一些代码示例,展示了Message.setAddress()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Message.setAddress()
方法的具体详情如下:
包路径:org.apache.qpid.proton.message.Message
类名称:Message
方法名:setAddress
暂无
代码示例来源:origin: org.apache.qpid/proton-j-impl
private void restoreMessage(Message m)
{
m.setAddress(_original);
}
代码示例来源:origin: com.microsoft.azure.iot/proton-j-azure-iot
private void restoreMessage(Message m)
{
m.setAddress(_original);
}
代码示例来源:origin: io.vertx/vertx-proton
/**
* Creates a Message object with the given String contained as an AmqpValue body, and the 'to' address set as given.
*
* @param address
* the 'to' address to set
* @param body
* the string to set as an AmqpValue body
* @return the message
*/
public static Message message(String address, String body) {
Message value = message(body);
value.setAddress(address);
return value;
}
代码示例来源:origin: EnMasseProject/enmasse
public Future<Integer> sendMessages(String address, List<String> messages, Predicate<Message> predicate) {
List<Message> messageList = messages.stream()
.map(body -> {
Message message = Message.Factory.create();
message.setBody(new AmqpValue(body));
message.setAddress(address);
return message;
})
.collect(Collectors.toList());
return sendMessages(address, messageList, predicate);
}
代码示例来源:origin: org.apache.qpid/proton-j-impl
private void rewriteMessage(Message m)
{
_original = m.getAddress();
if (_rewrites.apply(_original)) {
m.setAddress(_rewrites.result());
} else {
m.setAddress(defaultRewrite(_original));
}
}
代码示例来源:origin: com.microsoft.azure.iot/proton-j-azure-iot
private void rewriteMessage(Message m)
{
_original = m.getAddress();
if (_rewrites.apply(_original)) {
m.setAddress(_rewrites.result());
} else {
m.setAddress(defaultRewrite(_original));
}
}
代码示例来源:origin: stackoverflow.com
Messenger mng = Proton.messenger();
mng.start();
mng.subscribe("amqp://~xxx.xxx.xxx.xxx");
Message msg = Proton.message();
msg.setAddress("amqp://yyy.yyy.yyy.yyy");
....
代码示例来源:origin: eclipse/hono
private AmqpContext createContext(
final ResourceIdentifier validatedAddress,
final ProtonDelivery delivery,
final Message message,
final Device authenticatedDevice) {
final String to = validatedAddress.toString();
if (!to.equals(message.getAddress())) {
LOG.debug("adjusting message's address [orig: {}, updated: {}]", message.getAddress(), to);
message.setAddress(to);
}
return new AmqpContext(delivery, message, authenticatedDevice);
}
代码示例来源:origin: org.eclipse.hono/hono-client
@Override
public final Future<ProtonDelivery> send(final String deviceId, final Map<String, ?> properties, final byte[] payload, final String contentType,
final String registrationAssertion) {
Objects.requireNonNull(deviceId);
Objects.requireNonNull(payload);
Objects.requireNonNull(contentType);
Objects.requireNonNull(registrationAssertion);
final Message msg = ProtonHelper.message();
msg.setAddress(getTo(deviceId));
MessageHelper.setPayload(msg, contentType, payload);
setApplicationProperties(msg, properties);
addProperties(msg, deviceId, registrationAssertion);
return send(msg);
}
代码示例来源:origin: apache/activemq-artemis
/**
* Sets the address which is applied to the AMQP message To field in the message properties
*
* @param address The address that should be applied in the Message To field.
*/
public void setAddress(String address) {
checkReadOnly();
lazyCreateProperties();
getWrappedMessage().setAddress(address);
}
代码示例来源:origin: org.eclipse.hono/hono-client
@Override
@Deprecated
public final Future<ProtonDelivery> send(final String deviceId, final Map<String, ?> properties,
final byte[] payload, final String contentType, final String registrationAssertion,
final Handler<Void> capacityAvailableHandler) {
Objects.requireNonNull(deviceId);
Objects.requireNonNull(payload);
Objects.requireNonNull(contentType);
Objects.requireNonNull(registrationAssertion);
final Message msg = ProtonHelper.message();
msg.setAddress(getTo(deviceId));
MessageHelper.setPayload(msg, contentType, payload);
setApplicationProperties(msg, properties);
addProperties(msg, deviceId, registrationAssertion);
return send(msg, capacityAvailableHandler);
}
代码示例来源:origin: EnMasseProject/enmasse
@Override
public List<String> getQueueNames(AmqpClient queueClient, Destination replyQueue, String topic) throws Exception {
Message requestMessage = Message.Factory.create();
Map<String, Object> appProperties = new HashMap<>();
appProperties.put(resourceProperty, "address." + topic);
appProperties.put(operationProperty, "getQueueNames");
requestMessage.setAddress(managementAddress);
requestMessage.setApplicationProperties(new ApplicationProperties(appProperties));
requestMessage.setReplyTo(replyQueue.getAddress());
requestMessage.setBody(new AmqpValue("[]"));
Future<Integer> sent = queueClient.sendMessages(managementAddress, requestMessage);
assertThat(String.format("Sender failed, expected %d messages", 1), sent.get(30, TimeUnit.SECONDS), is(1));
log.info("request sent");
Future<List<Message>> received = queueClient.recvMessages(replyQueue.getAddress(), 1);
assertThat(String.format("Receiver failed, expected %d messages", 1),
received.get(30, TimeUnit.SECONDS).size(), is(1));
AmqpValue val = (AmqpValue) received.get().get(0).getBody();
log.info("answer received: " + val.toString());
String queues = val.getValue().toString();
queues = queues.replaceAll("\\[|]|\"", "");
return Arrays.asList(queues.split(","));
}
代码示例来源:origin: org.eclipse.hono/hono-client
private static Message createResponseMessage(
final String targetAddress,
final String correlationId,
final String contentType,
final Buffer payload,
final Map<String, Object> properties,
final int status) {
Objects.requireNonNull(targetAddress);
Objects.requireNonNull(correlationId);
final Message msg = ProtonHelper.message();
msg.setCorrelationId(correlationId);
msg.setAddress(targetAddress);
MessageHelper.setPayload(msg, contentType, payload);
if (properties != null) {
msg.setApplicationProperties(new ApplicationProperties(properties));
}
MessageHelper.setCreationTime(msg);
MessageHelper.addProperty(msg, MessageHelper.APP_PROPERTY_STATUS, status);
return msg;
}
代码示例来源:origin: strimzi/strimzi-kafka-bridge
@Override
public Message toMessage(String address, KafkaConsumerRecord<String, byte[]> record) {
Message message = Proton.message();
message.setAddress(address);
message.decode(record.value(), 0, record.value().length);
// put message annotations about partition, offset and key (if not null)
Map<Symbol, Object> map = new HashMap<>();
map.put(Symbol.valueOf(AmqpBridge.AMQP_PARTITION_ANNOTATION), record.partition());
map.put(Symbol.valueOf(AmqpBridge.AMQP_OFFSET_ANNOTATION), record.offset());
map.put(Symbol.valueOf(AmqpBridge.AMQP_KEY_ANNOTATION), record.key());
map.put(Symbol.valueOf(AmqpBridge.AMQP_TOPIC_ANNOTATION), record.topic());
MessageAnnotations messageAnnotations = new MessageAnnotations(map);
message.setMessageAnnotations(messageAnnotations);
return message;
}
代码示例来源:origin: strimzi/strimzi-kafka-bridge
@Override
public Message toMessage(String address, KafkaConsumerRecord<String, byte[]> record) {
Message message = Proton.message();
message.setAddress(address);
// put message annotations about partition, offset and key (if not null)
Map<Symbol, Object> map = new HashMap<>();
map.put(Symbol.valueOf(AmqpBridge.AMQP_PARTITION_ANNOTATION), record.partition());
map.put(Symbol.valueOf(AmqpBridge.AMQP_OFFSET_ANNOTATION), record.offset());
map.put(Symbol.valueOf(AmqpBridge.AMQP_KEY_ANNOTATION), record.key());
map.put(Symbol.valueOf(AmqpBridge.AMQP_TOPIC_ANNOTATION), record.topic());
MessageAnnotations messageAnnotations = new MessageAnnotations(map);
message.setMessageAnnotations(messageAnnotations);
message.setBody(new Data(new Binary(record.value())));
return message;
}
代码示例来源:origin: EnMasseProject/enmasse
/**
* Return a raw AMQP message
*
* @return
*/
public Message toAmqp() {
Message message = ProtonHelper.message();
message.setMessageId(this.messageId);
Map<Symbol, Object> map = new HashMap<>();
map.put(Symbol.valueOf(AMQP_RETAIN_ANNOTATION), this.isRetain);
map.put(Symbol.valueOf(AMQP_QOS_ANNOTATION), this.qos.value());
MessageAnnotations messageAnnotations = new MessageAnnotations(map);
message.setMessageAnnotations(messageAnnotations);
message.setAddress(this.topic);
Header header = new Header();
header.setDurable(this.qos != MqttQoS.AT_MOST_ONCE);
message.setHeader(header);
message.setDeliveryCount(this.isDup ? 1 : 0);
// the payload could be null (or empty)
if (this.payload != null)
message.setBody(new Data(new Binary(this.payload.getBytes())));
return message;
}
代码示例来源:origin: org.apache.beam/beam-sdks-java-io-amqp
@Test
public void encodeDecodeLargeMessage() throws Exception {
Message message = Message.Factory.create();
message.setAddress("address");
message.setSubject("subject");
String body = Joiner.on("").join(Collections.nCopies(32 * 1024 * 1024, " "));
message.setBody(new AmqpValue(body));
AmqpMessageCoder coder = AmqpMessageCoder.of();
Message clone = CoderUtils.clone(coder, message);
clone.getBody().toString().equals(message.getBody().toString());
}
}
代码示例来源:origin: org.apache.beam/beam-sdks-java-io-amqp
@Test
public void encodeDecodeTooMuchLargerMessage() throws Exception {
thrown.expect(CoderException.class);
Message message = Message.Factory.create();
message.setAddress("address");
message.setSubject("subject");
String body = Joiner.on("").join(Collections.nCopies(64 * 1024 * 1024, " "));
message.setBody(new AmqpValue(body));
AmqpMessageCoder coder = AmqpMessageCoder.of();
byte[] encoded = CoderUtils.encodeToByteArray(coder, message);
}
代码示例来源:origin: org.apache.beam/beam-sdks-java-io-amqp
@Test
public void encodeDecode() throws Exception {
Message message = Message.Factory.create();
message.setBody(new AmqpValue("body"));
message.setAddress("address");
message.setSubject("test");
AmqpMessageCoder coder = AmqpMessageCoder.of();
Message clone = CoderUtils.clone(coder, message);
assertEquals("AmqpValue{body}", clone.getBody().toString());
assertEquals("address", clone.getAddress());
assertEquals("test", clone.getSubject());
}
代码示例来源:origin: org.apache.beam/beam-sdks-java-io-amqp
@Test
public void testRead() throws Exception {
PCollection<Message> output =
pipeline.apply(
AmqpIO.read()
.withMaxNumRecords(100)
.withAddresses(Collections.singletonList(broker.getQueueUri("testRead"))));
PAssert.thatSingleton(output.apply(Count.globally())).isEqualTo(100L);
Messenger sender = Messenger.Factory.create();
sender.start();
for (int i = 0; i < 100; i++) {
Message message = Message.Factory.create();
message.setAddress(broker.getQueueUri("testRead"));
message.setBody(new AmqpValue("Test " + i));
sender.put(message);
sender.send();
}
sender.stop();
pipeline.run();
}
内容来源于网络,如有侵权,请联系作者删除!