org.apache.qpid.proton.message.Message.getAddress()方法的使用及代码示例

x33g5p2x  于2022-01-25 转载在 其他  
字(6.9k)|赞(0)|评价(0)|浏览(158)

本文整理了Java中org.apache.qpid.proton.message.Message.getAddress()方法的一些代码示例,展示了Message.getAddress()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Message.getAddress()方法的具体详情如下:
包路径:org.apache.qpid.proton.message.Message
类名称:Message
方法名:getAddress

Message.getAddress介绍

暂无

代码示例

代码示例来源:origin: vert-x3/vertx-examples

String address = remoteTarget.getAddress() ;
if (address == null) {
 address = msg.getAddress();

代码示例来源:origin: eclipse/hono

private AmqpContext createContext(
    final ResourceIdentifier validatedAddress,
    final ProtonDelivery delivery,
    final Message message,
    final Device authenticatedDevice) {
  final String to = validatedAddress.toString();
  if (!to.equals(message.getAddress())) {
    LOG.debug("adjusting message's address [orig: {}, updated: {}]", message.getAddress(), to);
    message.setAddress(to);
  }
  return new AmqpContext(delivery, message, authenticatedDevice);
}

代码示例来源:origin: eclipse/hono

/**
 * Creates an AmqpContext instance using the specified delivery, message and authenticated device.
 * <p>
 * This constructor <b>does not</b> validate the message address. It is the responsibility of the caller to make
 * sure that the message address is valid i.e matches the pattern {@code endpointName/tenantId/deviceId}.
 * 
 * @param delivery The delivery of the message.
 * @param message The AMQP 1.0 message. The message must contain a valid address.
 * @param authenticatedDevice The device that authenticates to the adapter or {@code null} if the device is unauthenticated.
 * @throws NullPointerException if the delivery or message is null.
 */
AmqpContext(final ProtonDelivery delivery, final Message message, final Device authenticatedDevice) {
  this.delivery = Objects.requireNonNull(delivery);
  this.message = Objects.requireNonNull(message);
  this.authenticatedDevice = authenticatedDevice;
  this.resource = ResourceIdentifier.fromString(message.getAddress());
  this.payload = MessageHelper.getPayload(message);
}

代码示例来源:origin: strimzi/strimzi-kafka-bridge

String topic = (message.getAddress() == null) ?
    kafkaTopic :
    message.getAddress().replace('/', '.');

代码示例来源:origin: org.apache.qpid/proton

URI address = new URI(m.getAddress());
if (address.getHost() == null)
  throw new MessengerException("unable to send to address: " + m.getAddress());
throw new MessengerException("Invalid address: " + m.getAddress(), e);

代码示例来源:origin: org.eclipse.hono/hono-server

@Override
public void processMessage(final UpstreamReceiver client, final ProtonDelivery delivery, final Message data) {
  LinkStatus status = statusMap.get(client.getLinkId());
  if (status == null) {
    LOG.debug("creating new link status object [{}]", client.getLinkId());
    status = new LinkStatus(client);
    statusMap.put(client.getLinkId(), status);
  }
  LOG.debug("processing telemetry data [id: {}, to: {}, content-type: {}]", data.getMessageId(), data.getAddress(),
      data.getContentType());
  if (messageConsumer != null) {
    messageConsumer.accept(data);
  }
  ProtonHelper.accepted(delivery, true);
  status.onMsgReceived();
}

代码示例来源:origin: org.apache.qpid/proton-j-impl

private void rewriteMessage(Message m)
{
  _original = m.getAddress();
  if (_rewrites.apply(_original)) {
    m.setAddress(_rewrites.result());
  } else {
    m.setAddress(defaultRewrite(_original));
  }
}

代码示例来源:origin: com.microsoft.azure.iot/proton-j-azure-iot

private void rewriteMessage(Message m)
{
  _original = m.getAddress();
  if (_rewrites.apply(_original)) {
    m.setAddress(_rewrites.result());
  } else {
    m.setAddress(defaultRewrite(_original));
  }
}

代码示例来源:origin: EnMasseProject/enmasse

String topic = message.getAddress();

代码示例来源:origin: com.microsoft.azure.iot/proton-j-azure-iot

StoreEntry entry = _outgoingStore.put( m.getAddress() );
_outgoingTracker = new TrackerImpl(TrackerImpl.Type.OUTGOING,
                  _outgoingStore.trackEntry(entry));
String routedAddress = routeAddress(m.getAddress());
Address address = new Address(routedAddress);
if (address.getHost() == null)
pumpOut(m.getAddress(), sender);

代码示例来源:origin: EnMasseProject/enmasse

String topic = message.getAddress();

代码示例来源:origin: strimzi/strimzi-kafka-bridge

String topic = (message.getAddress() == null) ?
    kafkaTopic :
    message.getAddress().replace('/', '.');

代码示例来源:origin: org.eclipse.hono/hono-server

msg.getMessageId(), msg.getAddress(), msg.getContentType(), getDownstreamContainer(), sender.getCredit(), sender.getQueued());
forwardMessage(sender, msg, delivery);

代码示例来源:origin: Azure/azure-event-hubs-java

if (amqpMessage.getUserId() != null)
  receiveProperties.put(AmqpConstants.AMQP_PROPERTY_USER_ID, amqpMessage.getUserId());
if (amqpMessage.getAddress() != null)
  receiveProperties.put(AmqpConstants.AMQP_PROPERTY_TO, amqpMessage.getAddress());
if (amqpMessage.getSubject() != null)
  receiveProperties.put(AmqpConstants.AMQP_PROPERTY_SUBJECT, amqpMessage.getSubject());

代码示例来源:origin: org.apache.qpid/proton-j-impl

StoreEntry entry = _outgoingStore.put( m.getAddress() );
_outgoingTracker = new TrackerImpl(TrackerImpl.Type.OUTGOING,
                  _outgoingStore.trackEntry(entry));
String routedAddress = routeAddress(m.getAddress());
Address address = new Address(routedAddress);
if (address.getHost() == null)
int port = Integer.valueOf(ports);
Sender sender = getLink(address.getHost(), port, new SenderFinder(address.getName()));
pumpOut(m.getAddress(), sender);

代码示例来源:origin: EnMasseProject/enmasse

MqttQoS qos = MqttQoS.AT_MOST_ONCE;
String topic = message.getAddress();

代码示例来源:origin: EnMasseProject/enmasse

String topic = message.getAddress();

代码示例来源:origin: io.vertx/vertx-proton

@Override
public ProtonDelivery send(byte[] tag, Message message, Handler<ProtonDelivery> onUpdated) {
 if (anonymousSender && message.getAddress() == null) {
  throw new IllegalArgumentException("Message must have an address when using anonymous sender.");
 }
 // TODO: prevent odd combination of onRecieved callback + SenderSettleMode.SETTLED, or just allow it?
 Delivery delivery = sender().delivery(tag); // start a new delivery..
 ProtonWritableBufferImpl buffer = new ProtonWritableBufferImpl();
 MessageImpl msg = (MessageImpl) message;
 msg.encode(buffer);
 ReadableBuffer encoded = new ProtonReadableBufferImpl(buffer.getBuffer());
 sender().sendNoCopy(encoded);
 if (link.getSenderSettleMode() == SenderSettleMode.SETTLED) {
  delivery.settle();
 }
 sender().advance(); // ends the delivery.
 ProtonDeliveryImpl protonDeliveryImpl = new ProtonDeliveryImpl(delivery);
 if (onUpdated != null) {
  protonDeliveryImpl.setAutoSettle(autoSettle);
  protonDeliveryImpl.handler(onUpdated);
 } else {
  protonDeliveryImpl.setAutoSettle(true);
 }
 getSession().getConnectionImpl().flush();
 return protonDeliveryImpl;
}

代码示例来源:origin: org.apache.beam/beam-sdks-java-io-amqp

@Test
public void encodeDecode() throws Exception {
 Message message = Message.Factory.create();
 message.setBody(new AmqpValue("body"));
 message.setAddress("address");
 message.setSubject("test");
 AmqpMessageCoder coder = AmqpMessageCoder.of();
 Message clone = CoderUtils.clone(coder, message);
 assertEquals("AmqpValue{body}", clone.getBody().toString());
 assertEquals("address", clone.getAddress());
 assertEquals("test", clone.getSubject());
}

代码示例来源:origin: eclipse/hono

items.put(Tags.MESSAGE_BUS_DESTINATION.getKey(), message.getAddress());
msgSpan.log(items);
validateEndpoint(message.getAddress(), delivery)
.compose(address -> validateAddress(address, authenticatedDevice))
.recover(t -> {

相关文章