本文整理了Java中org.apache.qpid.proton.message.Message.getCorrelationId()
方法的一些代码示例,展示了Message.getCorrelationId()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Message.getCorrelationId()
方法的具体详情如下:
包路径:org.apache.qpid.proton.message.Message
类名称:Message
方法名:getCorrelationId
暂无
代码示例来源:origin: org.eclipse.hono/hono-server
/**
* @param request the request message from which to extract the correlationId
* @return The ID used to correlate the given request message. This can either be the provided correlationId
* (Correlation ID Pattern) or the messageId of the request (Message ID Pattern, if no correlationId is provided).
*/
private Object getCorrelationId(final Message request) {
/* if a correlationId is provided, we use it to correlate the response -> Correlation ID Pattern */
if (request.getCorrelationId() != null) {
return request.getCorrelationId();
} else {
/* otherwise we use the message id -> Message ID Pattern */
return request.getMessageId();
}
}
}
代码示例来源:origin: org.eclipse.hono/hono-core
/**
* Checks if an AMQP message contains either a message ID or a correlation ID.
*
* @param msg The message.
* @return {@code true} if the message has an ID that can be used for correlation.
*/
protected static final boolean hasCorrelationId(final Message msg) {
if (msg.getMessageId() == null && msg.getCorrelationId() == null) {
LOG.trace("message has neither a message-id nor correlation-id");
return false;
} else {
return true;
}
}
}
代码示例来源:origin: eclipse/hono
/**
* Checks if an AMQP message contains either a message ID or a correlation ID.
*
* @param msg The message.
* @return {@code true} if the message has an ID that can be used for correlation.
*/
protected static final boolean hasCorrelationId(final Message msg) {
if (msg.getMessageId() == null && msg.getCorrelationId() == null) {
LOG.trace("message has neither a message-id nor correlation-id");
return false;
} else {
return true;
}
}
}
代码示例来源:origin: EnMasseProject/enmasse
/**
* Return an AMQP_CLOSE message from the raw AMQP one
*
* @param message raw AMQP message
* @return AMQP_CLOSE message
*/
public static AmqpCloseMessage from(Message message) {
if (!message.getSubject().equals(AMQP_SUBJECT)) {
throw new IllegalArgumentException(String.format("AMQP message subject is no %s", AMQP_SUBJECT));
}
return new AmqpCloseMessage(AmqpHelper.getClientIdFromPublishAddress((String) message.getCorrelationId()));
}
代码示例来源:origin: EnMasseProject/enmasse
/**
* Return an AMQP_LIST message from the raw AMQP one
*
* @param message raw AMQP message
* @return AMQP_LIST message
*/
public static AmqpListMessage from(Message message) {
if (!message.getSubject().equals(AMQP_SUBJECT)) {
throw new IllegalArgumentException(String.format("AMQP message subject is no %s", AMQP_SUBJECT));
}
return new AmqpListMessage(AmqpHelper.getClientIdFromPublishAddress((String) message.getCorrelationId()));
}
代码示例来源:origin: org.eclipse.hono/hono-core
/**
* Adds a property for the correlation identifier.
* <p>
* The value of the property is set
* <ol>
* <li>to the AMQP message's correlation identifier, if not {@code null}, or<li>
* <li>to the AMQP message's message identifier, if not {@code null}.</li>
* </ol>
*
* @param message The AMQP message to retrieve the value from.
* @return This message for chaining.
* @throws IllegalArgumentException if the message doesn't contain a correlation id
* nor a message id.
*/
public EventBusMessage setCorrelationId(final Message message) {
if (message.getCorrelationId() != null) {
return setCorrelationId(message.getCorrelationId());
} else if (message.getMessageId() != null) {
return setCorrelationId(message.getMessageId());
} else {
throw new IllegalArgumentException("message does not contain message-id nor correlation-id");
}
}
代码示例来源:origin: eclipse/hono
/**
* Adds a property for the correlation identifier.
* <p>
* The value of the property is set
* <ol>
* <li>to the AMQP message's correlation identifier, if not {@code null}, or<li>
* <li>to the AMQP message's message identifier, if not {@code null}.</li>
* </ol>
*
* @param message The AMQP message to retrieve the value from.
* @return This message for chaining.
* @throws IllegalArgumentException if the message doesn't contain a correlation id
* nor a message id.
*/
public EventBusMessage setCorrelationId(final Message message) {
if (message.getCorrelationId() != null) {
return setCorrelationId(message.getCorrelationId());
} else if (message.getMessageId() != null) {
return setCorrelationId(message.getMessageId());
} else {
throw new IllegalArgumentException("message does not contain message-id nor correlation-id");
}
}
代码示例来源:origin: EnMasseProject/enmasse
/**
* Return an AMQP_UNSUBSCRIBE message from the raw AMQP one
*
* @param message raw AMQP message
* @return AMQP_UNSUBSCRIBE message
*/
@SuppressWarnings("unchecked")
public static AmqpUnsubscribeMessage from(Message message) {
if (!message.getSubject().equals(AMQP_SUBJECT)) {
throw new IllegalArgumentException(String.format("AMQP message subject is no %s", AMQP_SUBJECT));
}
Section section = message.getBody();
if ((section != null) && (section instanceof AmqpValue)) {
List<String> topics = (List<String>) ((AmqpValue) section).getValue();
return new AmqpUnsubscribeMessage(AmqpHelper.getClientIdFromPublishAddress((String) message.getCorrelationId()),
topics);
} else {
throw new IllegalArgumentException("AMQP message wrong body type");
}
}
代码示例来源:origin: EnMasseProject/enmasse
/**
* Return an AMQP_SUBSCRIBE message from the raw AMQP one
*
* @param message raw AMQP message
* @return AMQP_SUBSCRIBE message
*/
@SuppressWarnings("unchecked")
public static AmqpSubscribeMessage from(Message message) {
if (!message.getSubject().equals(AMQP_SUBJECT)) {
throw new IllegalArgumentException(String.format("AMQP message subject is no %s", AMQP_SUBJECT));
}
Section section = message.getBody();
if ((section != null) && (section instanceof AmqpValue)) {
Map<String, String> map = (Map<String, String>) ((AmqpValue) section).getValue();
// build the unique topic subscriptions list
List<AmqpTopicSubscription> topicSubscriptions = new ArrayList<>();
for (Map.Entry<String, String> entry: map.entrySet()) {
topicSubscriptions.add(new AmqpTopicSubscription(entry.getKey(), MqttQoS.valueOf(Integer.valueOf(entry.getValue()))));
}
return new AmqpSubscribeMessage(AmqpHelper.getClientIdFromPublishAddress((String) message.getCorrelationId()),
topicSubscriptions);
} else {
throw new IllegalArgumentException("AMQP message wrong body type");
}
}
代码示例来源:origin: Azure/azure-event-hubs-java
@Override
public void onReceiveComplete(Delivery delivery) {
final Message response = Proton.message();
final int msgSize = delivery.pending();
final byte[] buffer = new byte[msgSize];
final int read = receiveLink.recv(buffer, 0, msgSize);
response.decode(buffer, 0, read);
delivery.settle();
final OperationResult<Message, Exception> responseCallback = inflightRequests.remove(response.getCorrelationId());
if (responseCallback != null)
responseCallback.onComplete(response);
}
代码示例来源:origin: eclipse/hono
/**
* Checks whether a given tenant message contains all required properties.
*
* @param linkTarget The resource path to check the message's properties against for consistency.
* @param msg The AMQP 1.0 message to perform the checks on.
* @return {@code true} if the message passes all checks.
*/
public static boolean verify(final ResourceIdentifier linkTarget, final Message msg) {
if (msg.getMessageId() == null && msg.getCorrelationId() == null) {
LOG.trace("message has neither a message-id nor correlation-id");
return false;
} else if (msg.getSubject() == null) {
LOG.trace("message [{}] does not contain subject", msg.getMessageId());
return false;
} else if (msg.getReplyTo() == null) {
LOG.trace("message [{}] contains no reply-to address", msg.getMessageId());
return false;
} else if (msg.getBody() != null && !MessageHelper.hasDataBody(msg)) {
LOG.trace("message [{}] contains no Data section payload", msg.getMessageId());
return false;
} else {
return true;
}
}
}
代码示例来源:origin: Azure/azure-service-bus-java
String requestMessageId = (String)finalResponseMessage.getCorrelationId();
if(requestMessageId != null)
代码示例来源:origin: org.eclipse.hono/hono-client
final TriTuple<Handler<AsyncResult<R>>, Object, Span> handler = replyMap.remove(message.getCorrelationId());
replyToAddress, message.getCorrelationId());
ProtonHelper.rejected(delivery, true);
} else {
if (response == null) {
LOG.debug("discarding malformed response lacking status code [reply-to: {}, correlation ID: {}]",
replyToAddress, message.getCorrelationId());
TracingHelper.logError(span, "response from peer released (no status code)");
ProtonHelper.released(delivery, true);
} else {
LOG.debug("received response [reply-to: {}, subject: {}, correlation ID: {}, status: {}]",
replyToAddress, message.getSubject(), message.getCorrelationId(), response.getStatus());
addToCache(handler.two(), response);
if (span != null) {
代码示例来源:origin: eclipse/hono
/**
* Checks whether a given credentials message contains all required properties.
*
* @param linkTarget The resource path to check the message's properties against for consistency.
* @param msg The AMQP 1.0 message to perform the checks on.
* @return {@code true} if the message passes all checks.
*/
public static boolean verify(final ResourceIdentifier linkTarget, final Message msg) {
if (msg.getMessageId() == null && msg.getCorrelationId() == null) {
LOG.trace("message has neither a message-id nor correlation-id");
return false;
} else if (!CredentialsConstants.CredentialsAction.isValid(msg.getSubject())) {
LOG.trace("message [{}] does not contain valid subject property", msg.getMessageId());
return false;
} else if (msg.getReplyTo() == null) {
LOG.trace("message [{}] has no reply-to address set", msg.getMessageId());
return false;
} else if (!MessageHelper.hasDataBody(msg)) {
LOG.trace("message [{}] contains no Data section payload", msg.getMessageId());
return false;
} else {
return true;
}
}
代码示例来源:origin: org.eclipse.hono/hono-service-base
/**
* Checks whether a given credentials message contains all required properties.
*
* @param linkTarget The resource path to check the message's properties against for consistency.
* @param msg The AMQP 1.0 message to perform the checks on.
* @return {@code true} if the message passes all checks.
*/
public static boolean verify(final ResourceIdentifier linkTarget, final Message msg) {
if (msg.getMessageId() == null && msg.getCorrelationId() == null) {
LOG.trace("message has neither a message-id nor correlation-id");
return false;
} else if (!CredentialsConstants.CredentialsAction.isValid(msg.getSubject())) {
LOG.trace("message [{}] does not contain valid subject property", msg.getMessageId());
return false;
} else if (msg.getReplyTo() == null) {
LOG.trace("message [{}] has no reply-to address set", msg.getMessageId());
return false;
} else if (!MessageHelper.hasDataBody(msg, true)) {
LOG.trace("message [{}] contains no AmqpValue or Data section payload", msg.getMessageId());
return false;
} else {
return true;
}
}
代码示例来源:origin: org.eclipse.hono/hono-service-base
/**
* Checks whether a given tenant message contains all required properties.
*
* @param linkTarget The resource path to check the message's properties against for consistency.
* @param msg The AMQP 1.0 message to perform the checks on.
* @return {@code true} if the message passes all checks.
*/
public static boolean verify(final ResourceIdentifier linkTarget, final Message msg) {
if (msg.getMessageId() == null && msg.getCorrelationId() == null) {
LOG.trace("message has neither a message-id nor correlation-id");
return false;
} else if (msg.getSubject() == null) {
LOG.trace("message [{}] does not contain subject", msg.getMessageId());
return false;
} else if (msg.getReplyTo() == null) {
LOG.trace("message [{}] contains no reply-to address", msg.getMessageId());
return false;
} else if (msg.getBody() != null && !MessageHelper.hasDataBody(msg, true)) {
LOG.trace("message [{}] contains no AmqpValue or Data section payload", msg.getMessageId());
return false;
} else {
return true;
}
}
}
代码示例来源:origin: org.eclipse.hono/hono-server
} else if (msg.getMessageId() == null && msg.getCorrelationId() == null) {
LOG.trace("message has neither a message-id nor correlation-id");
return false;
代码示例来源:origin: eclipse/hono
private Future<ProtonDelivery> doUploadCommandResponseMessage(final AmqpContext context, final Span currentSpan) {
final String correlationId = Optional.ofNullable(context.getMessage().getCorrelationId())
.map(id -> {
if (id instanceof String) {
代码示例来源:origin: eclipse/hono
if (msg.getCorrelationId() == null) {
代码示例来源:origin: Azure/azure-event-hubs-java
if (amqpMessage.getReplyTo() != null)
receiveProperties.put(AmqpConstants.AMQP_PROPERTY_REPLY_TO, amqpMessage.getReplyTo());
if (amqpMessage.getCorrelationId() != null)
receiveProperties.put(AmqpConstants.AMQP_PROPERTY_CORRELATION_ID, amqpMessage.getCorrelationId());
if (amqpMessage.getContentType() != null)
receiveProperties.put(AmqpConstants.AMQP_PROPERTY_CONTENT_TYPE, amqpMessage.getContentType());
内容来源于网络,如有侵权,请联系作者删除!