org.apache.qpid.proton.message.Message.setMessageId()方法的使用及代码示例

x33g5p2x  于2022-01-25 转载在 其他  
字(10.2k)|赞(0)|评价(0)|浏览(148)

本文整理了Java中org.apache.qpid.proton.message.Message.setMessageId()方法的一些代码示例,展示了Message.setMessageId()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Message.setMessageId()方法的具体详情如下:
包路径:org.apache.qpid.proton.message.Message
类名称:Message
方法名:setMessageId

Message.setMessageId介绍

暂无

代码示例

代码示例来源:origin: EnMasseProject/enmasse

/**
 * Return a raw AMQP message
 *
 * @return
 */
public Message toAmqp() {
  Message message = ProtonHelper.message();
  message.setSubject(AMQP_SUBJECT);
  message.setMessageId(this.messageId);
  return message;
}

代码示例来源:origin: apache/activemq-artemis

/**
* Sets the MessageId property on an outbound message using the provided value
*
* @param messageId the message ID value to set.
*/
public void setRawMessageId(Object messageId) {
 checkReadOnly();
 lazyCreateProperties();
 getWrappedMessage().setMessageId(messageId);
}

代码示例来源:origin: apache/activemq-artemis

/**
* Sets the MessageId property on an outbound message using the provided String
*
* @param messageId the String message ID value to set.
*/
public void setMessageId(String messageId) {
 checkReadOnly();
 lazyCreateProperties();
 getWrappedMessage().setMessageId(messageId);
}

代码示例来源:origin: Azure/azure-event-hubs-java

public void request(
    final Message message,
    final OperationResult<Message, Exception> onResponse) {
  if (message == null)
    throw new IllegalArgumentException("message cannot be null");
  if (message.getMessageId() != null)
    throw new IllegalArgumentException("message.getMessageId() should be null");
  if (message.getReplyTo() != null)
    throw new IllegalArgumentException("message.getReplyTo() should be null");
  message.setMessageId("request" + UnsignedLong.valueOf(this.requestId.incrementAndGet()).toString());
  message.setReplyTo(this.replyTo);
  this.inflightRequests.put(message.getMessageId(), onResponse);
  sendLink.delivery(UUID.randomUUID().toString().replace("-", StringUtil.EMPTY).getBytes());
  final int payloadSize = AmqpUtil.getDataSerializedSize(message) + 512; // need buffer for headers
  final byte[] bytes = new byte[payloadSize];
  final int encodedSize = message.encode(bytes, 0, payloadSize);
  receiveLink.flow(1);
  sendLink.send(bytes, 0, encodedSize);
  sendLink.advance();
}

代码示例来源:origin: org.eclipse.hono/hono-client

/**
 * Build a Proton message with a provided subject (serving as the operation that shall be invoked).
 * The message can be extended by arbitrary application properties passed in.
 * <p>
 * To enable specific message properties that are not considered here, the method can be overridden by subclasses.
 *
 * @param subject The subject system property of the message.
 * @param appProperties The map containing arbitrary application properties.
 *                      Maybe null if no application properties are needed.
 * @return The Proton message constructed from the provided parameters.
 * @throws NullPointerException if the subject is {@code null}.
 * @throws IllegalArgumentException if the application properties contain not AMQP 1.0 compatible values
 *                  (see {@link AbstractHonoClient#setApplicationProperties(Message, Map)}
 */
private Message createMessage(final String subject, final Map<String, Object> appProperties) {
  Objects.requireNonNull(subject);
  final Message msg = ProtonHelper.message();
  final String messageId = createMessageId();
  AbstractHonoClient.setApplicationProperties(msg, appProperties);
  msg.setReplyTo(replyToAddress);
  msg.setMessageId(messageId);
  msg.setSubject(subject);
  return msg;
}

代码示例来源:origin: Azure/azure-service-bus-java

public CompletableFuture<Message> requestAysnc(Message requestMessage, TransactionContext transaction, Duration timeout)
{
  this.throwIfClosed(null);
  
  CompletableFuture<Message> responseFuture = new CompletableFuture<Message>();
  RequestResponseWorkItem workItem = new RequestResponseWorkItem(requestMessage, transaction, responseFuture, timeout);
  String requestId = "request:" +  this.requestCounter.incrementAndGet();
  requestMessage.setMessageId(requestId);
  requestMessage.setReplyTo(this.replyTo);
  this.pendingRequests.put(requestId, workItem);
  workItem.setTimeoutTask(this.scheduleRequestTimeout(requestId, timeout));
  TRACE_LOGGER.debug("Sending request with id:{}", requestId);
  this.amqpSender.sendRequest(requestId, false);
  
  // Check and recreate links if necessary
  if(!((this.amqpSender.sendLink.getLocalState() == EndpointState.ACTIVE && this.amqpSender.sendLink.getRemoteState() == EndpointState.ACTIVE)
      && (this.amqpReceiver.receiveLink.getLocalState() == EndpointState.ACTIVE && this.amqpReceiver.receiveLink.getRemoteState() == EndpointState.ACTIVE)))
  {
    this.ensureUniqueLinkRecreation();
  }
  
  return responseFuture;
}

代码示例来源:origin: org.eclipse.hono/hono-client

message.setMessageId(messageId);
final Map<String, Object> details = new HashMap<>(3);
details.put(TracingHelper.TAG_MESSAGE_ID.getKey(), messageId);

代码示例来源:origin: org.eclipse.hono/hono-core

message.setMessageId(UUID.randomUUID().toString());
message.setCorrelationId(correlationId);
message.setAddress(address.toString());

代码示例来源:origin: eclipse/hono

message.setMessageId(UUID.randomUUID().toString());
message.setCorrelationId(correlationId);
message.setAddress(address.toString());

代码示例来源:origin: eclipse/hono

/**
   * Verifies that the endpoint forwards a request message via the event bus.
   */
  @Test
  public void testProcessMessageSendsRequestViaEventBus() {

    final Message msg = ProtonHelper.message();
    msg.setMessageId("4711");
    msg.setSubject(RegistrationConstants.ACTION_ASSERT);
    msg.setBody(new AmqpValue(new JsonObject().put("temp", 15).encode()));
    MessageHelper.annotate(msg, resource);
    endpoint.processRequest(msg, resource, Constants.PRINCIPAL_ANONYMOUS);

    verify(eventBus).send(eq(RegistrationConstants.EVENT_BUS_ADDRESS_REGISTRATION_IN), any(JsonObject.class), any(DeliveryOptions.class));
  }
}

代码示例来源:origin: org.eclipse.hono/hono-client

message.setMessageId(messageId);
final Map<String, Object> details = new HashMap<>(2);
details.put(TracingHelper.TAG_MESSAGE_ID.getKey(), messageId);

代码示例来源:origin: eclipse/hono

/**
   * Verifies that the endpoint forwards a request message via the event bus.
   */
  @Test
  public void testProcessMessageSendsRequestViaEventBus() {

    final Message msg = ProtonHelper.message();
    msg.setMessageId("random-id");
    msg.setSubject(TenantConstants.TenantAction.get.toString());
    MessageHelper.addTenantId(msg, Constants.DEFAULT_TENANT);
    MessageHelper.annotate(msg, resource);

    endpoint.processRequest(msg, resource, Constants.PRINCIPAL_ANONYMOUS);

    verify(eventBus).send(eq(TenantConstants.EVENT_BUS_ADDRESS_TENANT_IN), any(JsonObject.class), any(DeliveryOptions.class));
  }
}

代码示例来源:origin: eclipse/hono

private static Message givenAMessageHavingProperties(final String deviceId, final String action) {
    final Message msg = ProtonHelper.message();
    msg.setMessageId("msg-id");
    msg.setReplyTo("reply");
    msg.setSubject(action);
    if (deviceId != null) {
      MessageHelper.addDeviceId(msg, deviceId);
    }
    return msg;
  }
}

代码示例来源:origin: eclipse/hono

private static Message givenAValidMessageWithoutBody(final CredentialsConstants.CredentialsAction action) {
    final Message msg = ProtonHelper.message();
    msg.setMessageId("msg");
    msg.setReplyTo("reply");
    msg.setSubject(action.toString());
    return msg;
  }
}

代码示例来源:origin: eclipse/hono

private Message givenAMessageHavingProperties(final TenantConstants.TenantAction action) {
    final Message msg = ProtonHelper.message();
    msg.setMessageId("msg");
    msg.setReplyTo("reply");
    msg.setSubject(action.toString());
    return msg;
  }
}

代码示例来源:origin: eclipse/hono

/**
   * Verifies that the endpoint forwards a request message via the event bus.
   */
  @Test
  public void testProcessMessageSendsRequestViaEventBus() {

    final Message msg = ProtonHelper.message();
    msg.setMessageId("random-id");
    msg.setSubject(CredentialsConstants.CredentialsAction.add.toString());
    MessageHelper.addDeviceId(msg, "4711");
    MessageHelper.addTenantId(msg, Constants.DEFAULT_TENANT);
    MessageHelper.annotate(msg, resource);

    msg.setBody(new AmqpValue(new JsonObject().put("temp", 15).encode()));

    endpoint.processRequest(msg, resource, Constants.PRINCIPAL_ANONYMOUS);

    verify(eventBus).send(eq(CredentialsConstants.EVENT_BUS_ADDRESS_CREDENTIALS_IN), any(JsonObject.class), any(DeliveryOptions.class));
  }
}

代码示例来源:origin: org.eclipse.hono/hono-client

/**
 * {@inheritDoc}
 */
@Override
public Future<Void> sendOneWayCommand(final String command, final String contentType, final Buffer data, final Map<String, Object> properties) {
  Objects.requireNonNull(command);
  final Span currentSpan = newChildSpan(null, command);
  if (sender.isOpen()) {
    final Future<BufferResult> responseTracker = Future.future();
    final Message request = ProtonHelper.message();
    AbstractHonoClient.setApplicationProperties(request, properties);
    final String messageId = createMessageId();
    request.setMessageId(messageId);
    request.setSubject(command);
    MessageHelper.setPayload(request, contentType, data);
    sendRequest(request, responseTracker.completer(), null, currentSpan);
    return responseTracker.map(ignore -> null);
  } else {
    TracingHelper.logError(currentSpan, "sender link is not open");
    return Future.failedFuture(new ServerErrorException(
        HttpURLConnection.HTTP_UNAVAILABLE, "sender link is not open"));
  }
}

代码示例来源:origin: EnMasseProject/enmasse

/**
 * Return a raw AMQP message
 *
 * @return
 */
public Message toAmqp() {
  Message message = ProtonHelper.message();
  message.setMessageId(this.messageId);
  Map<Symbol, Object> map = new HashMap<>();
  map.put(Symbol.valueOf(AMQP_RETAIN_ANNOTATION), this.isRetain);
  map.put(Symbol.valueOf(AMQP_QOS_ANNOTATION), this.qos.value());
  MessageAnnotations messageAnnotations = new MessageAnnotations(map);
  message.setMessageAnnotations(messageAnnotations);
  message.setAddress(this.topic);
  Header header = new Header();
  header.setDurable(this.qos != MqttQoS.AT_MOST_ONCE);
  message.setHeader(header);
  message.setDeliveryCount(this.isDup ? 1 : 0);
  // the payload could be null (or empty)
  if (this.payload != null)
    message.setBody(new Data(new Binary(this.payload.getBytes())));
  return message;
}

代码示例来源:origin: eclipse/hono

/**
 * Verifies that the filter detects a missing subject.
 */
@Test
public void testVerifyDetectsMissingSubject() {
  // GIVEN a request message without a subject
  final Message msg = ProtonHelper.message();
  msg.setMessageId("msg");
  msg.setReplyTo("reply");
  // WHEN receiving the message via a link with any tenant
  final ResourceIdentifier linkTarget = getResourceIdentifier(DEFAULT_TENANT);
  // THEN message validation fails
  assertFalse(TenantMessageFilter.verify(linkTarget, msg));
}

代码示例来源:origin: Azure/azure-service-bus-java

amqpMessage.setMessageId(brokeredMessage.getMessageId());
amqpMessage.setContentType(brokeredMessage.getContentType());
amqpMessage.setCorrelationId(brokeredMessage.getCorrelationId());

相关文章