org.apache.qpid.proton.message.Message.setSubject()方法的使用及代码示例

x33g5p2x  于2022-01-25 转载在 其他  
字(10.9k)|赞(0)|评价(0)|浏览(137)

本文整理了Java中org.apache.qpid.proton.message.Message.setSubject()方法的一些代码示例,展示了Message.setSubject()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Message.setSubject()方法的具体详情如下:
包路径:org.apache.qpid.proton.message.Message
类名称:Message
方法名:setSubject

Message.setSubject介绍

暂无

代码示例

代码示例来源:origin: EnMasseProject/enmasse

/**
 * Return a raw AMQP message
 *
 * @return
 */
public Message toAmqp() {
  Message message = ProtonHelper.message();
  message.setSubject(AMQP_SUBJECT);
  message.setCorrelationId(String.format(AmqpHelper.AMQP_CLIENT_PUBLISH_ADDRESS_TEMPLATE, this.clientId));
  return message;
}

代码示例来源:origin: EnMasseProject/enmasse

/**
 * Return a raw AMQP message
 *
 * @return
 */
public Message toAmqp() {
  Message message = ProtonHelper.message();
  message.setSubject(AMQP_SUBJECT);
  message.setCorrelationId(String.format(AmqpHelper.AMQP_CLIENT_PUBLISH_ADDRESS_TEMPLATE, this.clientId));
  message.setReplyTo(String.format(AmqpHelper.AMQP_CLIENT_CONTROL_ADDRESS_TEMPLATE, this.clientId));
  return message;
}

代码示例来源:origin: EnMasseProject/enmasse

/**
 * Return a raw AMQP message
 *
 * @return
 */
public Message toAmqp() {
  Message message = ProtonHelper.message();
  message.setSubject(AMQP_SUBJECT);
  message.setMessageId(this.messageId);
  return message;
}

代码示例来源:origin: apache/activemq-artemis

/**
* Sets the Subject property on an outbound message using the provided String
*
* @param subject the String Subject value to set.
*/
public void setSubject(String subject) {
 checkReadOnly();
 lazyCreateProperties();
 getWrappedMessage().setSubject(subject);
}

代码示例来源:origin: EnMasseProject/enmasse

/**
 * Return a raw AMQP message
 *
 * @return
 */
public Message toAmqp() {
  Message message = ProtonHelper.message();
  message.setSubject(AMQP_SUBJECT);
  message.setCorrelationId(String.format(AmqpHelper.AMQP_CLIENT_PUBLISH_ADDRESS_TEMPLATE, this.clientId));
  // map with topic -> qos (in String format)
  Map<String, String> map = new HashMap<>();
  this.topicSubscriptions.stream().forEach(amqpTopicSubscription -> {
    map.put(amqpTopicSubscription.topic(), String.valueOf(amqpTopicSubscription.qos().value()));
  });
  message.setBody(new AmqpValue(map));
  return message;
}

代码示例来源:origin: EnMasseProject/enmasse

/**
 * Return a raw AMQP message
 *
 * @return
 */
public Message toAmqp() {
  Message message = ProtonHelper.message();
  message.setSubject(AMQP_SUBJECT);
  // map with topic -> qos (in String format)
  Map<String, String> map = new HashMap<>();
  this.topicSubscriptions.stream().forEach(amqpTopicSubscription -> {
    map.put(amqpTopicSubscription.topic(), String.valueOf(amqpTopicSubscription.qos().value()));
  });
  message.setBody(new AmqpValue(map));
  return message;
}

代码示例来源:origin: EnMasseProject/enmasse

/**
 * Return a raw AMQP message
 *
 * @return
 */
public Message toAmqp() {
  Message message = ProtonHelper.message();
  message.setSubject(AMQP_SUBJECT);
  message.setCorrelationId(String.format(AmqpHelper.AMQP_CLIENT_PUBLISH_ADDRESS_TEMPLATE, this.clientId));
  message.setBody(new AmqpValue(this.topics));
  return message;
}

代码示例来源:origin: eclipse/hono

/**
   * Verifies that the endpoint forwards a request message via the event bus.
   */
  @Test
  public void testProcessMessageSendsRequestViaEventBus() {

    final Message msg = ProtonHelper.message();
    msg.setMessageId("4711");
    msg.setSubject(RegistrationConstants.ACTION_ASSERT);
    msg.setBody(new AmqpValue(new JsonObject().put("temp", 15).encode()));
    MessageHelper.annotate(msg, resource);
    endpoint.processRequest(msg, resource, Constants.PRINCIPAL_ANONYMOUS);

    verify(eventBus).send(eq(RegistrationConstants.EVENT_BUS_ADDRESS_REGISTRATION_IN), any(JsonObject.class), any(DeliveryOptions.class));
  }
}

代码示例来源:origin: eclipse/hono

/**
   * Verifies that the endpoint forwards a request message via the event bus.
   */
  @Test
  public void testProcessMessageSendsRequestViaEventBus() {

    final Message msg = ProtonHelper.message();
    msg.setMessageId("random-id");
    msg.setSubject(TenantConstants.TenantAction.get.toString());
    MessageHelper.addTenantId(msg, Constants.DEFAULT_TENANT);
    MessageHelper.annotate(msg, resource);

    endpoint.processRequest(msg, resource, Constants.PRINCIPAL_ANONYMOUS);

    verify(eventBus).send(eq(TenantConstants.EVENT_BUS_ADDRESS_TENANT_IN), any(JsonObject.class), any(DeliveryOptions.class));
  }
}

代码示例来源:origin: eclipse/hono

private static Message givenAValidMessageWithoutBody(final CredentialsConstants.CredentialsAction action) {
    final Message msg = ProtonHelper.message();
    msg.setMessageId("msg");
    msg.setReplyTo("reply");
    msg.setSubject(action.toString());
    return msg;
  }
}

代码示例来源:origin: eclipse/hono

private Message givenAMessageHavingProperties(final TenantConstants.TenantAction action) {
    final Message msg = ProtonHelper.message();
    msg.setMessageId("msg");
    msg.setReplyTo("reply");
    msg.setSubject(action.toString());
    return msg;
  }
}

代码示例来源:origin: eclipse/hono

/**
 * Verifies that the endpoint rejects request messages for operations the client
 * is not authorized to invoke.
 */
@Test
public void testHandleMessageRejectsUnauthorizedRequests() {
  final Message msg = ProtonHelper.message();
  msg.setSubject("unauthorized");
  msg.setReplyTo(REPLY_RESOURCE.toString());
  final ProtonConnection con = mock(ProtonConnection.class);
  final ProtonDelivery delivery = mock(ProtonDelivery.class);
  final AuthorizationService authService = mock(AuthorizationService.class);
  when(authService.isAuthorized(any(HonoUser.class), any(ResourceIdentifier.class), anyString())).thenReturn(Future.succeededFuture(Boolean.FALSE));
  final Future<Void> processingTracker = Future.future();
  final RequestResponseEndpoint<ServiceConfigProperties> endpoint = getEndpoint(true, processingTracker);
  endpoint.setAuthorizationService(authService);
  endpoint.onLinkAttach(con, sender, REPLY_RESOURCE);
  // WHEN a request for an operation is received that the client is not authorized to invoke
  endpoint.handleMessage(con, receiver, resource, delivery, msg);
  // THEN the the message is rejected
  final ArgumentCaptor<DeliveryState> deliveryState = ArgumentCaptor.forClass(DeliveryState.class);
  verify(delivery).disposition(deliveryState.capture(), booleanThat(is(Boolean.TRUE)));
  assertThat(deliveryState.getValue(), instanceOf(Rejected.class));
  verify(receiver, never()).close();
  verify(authService).isAuthorized(Constants.PRINCIPAL_ANONYMOUS, resource, "unauthorized");
  assertFalse(processingTracker.isComplete());
}

代码示例来源:origin: eclipse/hono

private static Message givenAMessageHavingProperties(final String deviceId, final String action) {
    final Message msg = ProtonHelper.message();
    msg.setMessageId("msg-id");
    msg.setReplyTo("reply");
    msg.setSubject(action);
    if (deviceId != null) {
      MessageHelper.addDeviceId(msg, deviceId);
    }
    return msg;
  }
}

代码示例来源:origin: EnMasseProject/enmasse

/**
 * Return a raw AMQP message
 *
 * @return
 */
public Message toAmqp() {
  Message message = ProtonHelper.message();
  message.setSubject(AMQP_SUBJECT);
  Map<Symbol, Object> map = new HashMap<>();
  map.put(Symbol.valueOf(AMQP_RETAIN_ANNOTATION), this.isRetain);
  map.put(Symbol.valueOf(AMQP_QOS_ANNOTATION), this.qos.value());
  MessageAnnotations messageAnnotations = new MessageAnnotations(map);
  message.setMessageAnnotations(messageAnnotations);
  message.setAddress(this.topic);
  Header header = new Header();
  header.setDurable(this.qos != MqttQoS.AT_MOST_ONCE);
  message.setHeader(header);
  // the payload could be null (or empty)
  if (this.payload != null)
    message.setBody(new Data(new Binary(this.payload.getBytes())));
  return message;
}

代码示例来源:origin: EnMasseProject/enmasse

/**
 * Return a raw AMQP message
 *
 * @return
 */
public Message toAmqp() {
  Message message = ProtonHelper.message();
  message.setSubject(AMQP_SUBJECT);
  Map<Symbol, Object> map = new HashMap<>();
  map.put(Symbol.valueOf(AMQP_RETAIN_ANNOTATION), this.isRetain);
  map.put(Symbol.valueOf(AMQP_QOS_ANNOTATION), this.qos.value());
  MessageAnnotations messageAnnotations = new MessageAnnotations(map);
  message.setMessageAnnotations(messageAnnotations);
  message.setAddress(this.topic);
  Header header = new Header();
  header.setDurable(this.qos != MqttQoS.AT_MOST_ONCE);
  message.setHeader(header);
  // the payload could be null (or empty)
  if (this.payload != null)
    message.setBody(new Data(new Binary(this.payload.getBytes())));
  return message;
}

代码示例来源:origin: eclipse/hono

/**
   * Verifies that the endpoint forwards a request message via the event bus.
   */
  @Test
  public void testProcessMessageSendsRequestViaEventBus() {

    final Message msg = ProtonHelper.message();
    msg.setMessageId("random-id");
    msg.setSubject(CredentialsConstants.CredentialsAction.add.toString());
    MessageHelper.addDeviceId(msg, "4711");
    MessageHelper.addTenantId(msg, Constants.DEFAULT_TENANT);
    MessageHelper.annotate(msg, resource);

    msg.setBody(new AmqpValue(new JsonObject().put("temp", 15).encode()));

    endpoint.processRequest(msg, resource, Constants.PRINCIPAL_ANONYMOUS);

    verify(eventBus).send(eq(CredentialsConstants.EVENT_BUS_ADDRESS_CREDENTIALS_IN), any(JsonObject.class), any(DeliveryOptions.class));
  }
}

代码示例来源:origin: org.eclipse.hono/hono-client

/**
 * {@inheritDoc}
 */
@Override
public Future<Void> sendOneWayCommand(final String command, final String contentType, final Buffer data, final Map<String, Object> properties) {
  Objects.requireNonNull(command);
  final Span currentSpan = newChildSpan(null, command);
  if (sender.isOpen()) {
    final Future<BufferResult> responseTracker = Future.future();
    final Message request = ProtonHelper.message();
    AbstractHonoClient.setApplicationProperties(request, properties);
    final String messageId = createMessageId();
    request.setMessageId(messageId);
    request.setSubject(command);
    MessageHelper.setPayload(request, contentType, data);
    sendRequest(request, responseTracker.completer(), null, currentSpan);
    return responseTracker.map(ignore -> null);
  } else {
    TracingHelper.logError(currentSpan, "sender link is not open");
    return Future.failedFuture(new ServerErrorException(
        HttpURLConnection.HTTP_UNAVAILABLE, "sender link is not open"));
  }
}

代码示例来源:origin: org.apache.beam/beam-sdks-java-io-amqp

@Test
 public void encodeDecodeLargeMessage() throws Exception {
  Message message = Message.Factory.create();
  message.setAddress("address");
  message.setSubject("subject");
  String body = Joiner.on("").join(Collections.nCopies(32 * 1024 * 1024, " "));
  message.setBody(new AmqpValue(body));

  AmqpMessageCoder coder = AmqpMessageCoder.of();

  Message clone = CoderUtils.clone(coder, message);

  clone.getBody().toString().equals(message.getBody().toString());
 }
}

代码示例来源:origin: org.apache.beam/beam-sdks-java-io-amqp

@Test
public void encodeDecodeTooMuchLargerMessage() throws Exception {
 thrown.expect(CoderException.class);
 Message message = Message.Factory.create();
 message.setAddress("address");
 message.setSubject("subject");
 String body = Joiner.on("").join(Collections.nCopies(64 * 1024 * 1024, " "));
 message.setBody(new AmqpValue(body));
 AmqpMessageCoder coder = AmqpMessageCoder.of();
 byte[] encoded = CoderUtils.encodeToByteArray(coder, message);
}

代码示例来源:origin: org.apache.beam/beam-sdks-java-io-amqp

@Test
public void encodeDecode() throws Exception {
 Message message = Message.Factory.create();
 message.setBody(new AmqpValue("body"));
 message.setAddress("address");
 message.setSubject("test");
 AmqpMessageCoder coder = AmqpMessageCoder.of();
 Message clone = CoderUtils.clone(coder, message);
 assertEquals("AmqpValue{body}", clone.getBody().toString());
 assertEquals("address", clone.getAddress());
 assertEquals("test", clone.getSubject());
}

相关文章