本文整理了Java中org.apache.qpid.proton.message.Message.setSubject()
方法的一些代码示例,展示了Message.setSubject()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Message.setSubject()
方法的具体详情如下:
包路径:org.apache.qpid.proton.message.Message
类名称:Message
方法名:setSubject
暂无
代码示例来源:origin: EnMasseProject/enmasse
/**
* Return a raw AMQP message
*
* @return
*/
public Message toAmqp() {
Message message = ProtonHelper.message();
message.setSubject(AMQP_SUBJECT);
message.setCorrelationId(String.format(AmqpHelper.AMQP_CLIENT_PUBLISH_ADDRESS_TEMPLATE, this.clientId));
return message;
}
代码示例来源:origin: EnMasseProject/enmasse
/**
* Return a raw AMQP message
*
* @return
*/
public Message toAmqp() {
Message message = ProtonHelper.message();
message.setSubject(AMQP_SUBJECT);
message.setCorrelationId(String.format(AmqpHelper.AMQP_CLIENT_PUBLISH_ADDRESS_TEMPLATE, this.clientId));
message.setReplyTo(String.format(AmqpHelper.AMQP_CLIENT_CONTROL_ADDRESS_TEMPLATE, this.clientId));
return message;
}
代码示例来源:origin: EnMasseProject/enmasse
/**
* Return a raw AMQP message
*
* @return
*/
public Message toAmqp() {
Message message = ProtonHelper.message();
message.setSubject(AMQP_SUBJECT);
message.setMessageId(this.messageId);
return message;
}
代码示例来源:origin: apache/activemq-artemis
/**
* Sets the Subject property on an outbound message using the provided String
*
* @param subject the String Subject value to set.
*/
public void setSubject(String subject) {
checkReadOnly();
lazyCreateProperties();
getWrappedMessage().setSubject(subject);
}
代码示例来源:origin: EnMasseProject/enmasse
/**
* Return a raw AMQP message
*
* @return
*/
public Message toAmqp() {
Message message = ProtonHelper.message();
message.setSubject(AMQP_SUBJECT);
message.setCorrelationId(String.format(AmqpHelper.AMQP_CLIENT_PUBLISH_ADDRESS_TEMPLATE, this.clientId));
// map with topic -> qos (in String format)
Map<String, String> map = new HashMap<>();
this.topicSubscriptions.stream().forEach(amqpTopicSubscription -> {
map.put(amqpTopicSubscription.topic(), String.valueOf(amqpTopicSubscription.qos().value()));
});
message.setBody(new AmqpValue(map));
return message;
}
代码示例来源:origin: EnMasseProject/enmasse
/**
* Return a raw AMQP message
*
* @return
*/
public Message toAmqp() {
Message message = ProtonHelper.message();
message.setSubject(AMQP_SUBJECT);
// map with topic -> qos (in String format)
Map<String, String> map = new HashMap<>();
this.topicSubscriptions.stream().forEach(amqpTopicSubscription -> {
map.put(amqpTopicSubscription.topic(), String.valueOf(amqpTopicSubscription.qos().value()));
});
message.setBody(new AmqpValue(map));
return message;
}
代码示例来源:origin: EnMasseProject/enmasse
/**
* Return a raw AMQP message
*
* @return
*/
public Message toAmqp() {
Message message = ProtonHelper.message();
message.setSubject(AMQP_SUBJECT);
message.setCorrelationId(String.format(AmqpHelper.AMQP_CLIENT_PUBLISH_ADDRESS_TEMPLATE, this.clientId));
message.setBody(new AmqpValue(this.topics));
return message;
}
代码示例来源:origin: eclipse/hono
/**
* Verifies that the endpoint forwards a request message via the event bus.
*/
@Test
public void testProcessMessageSendsRequestViaEventBus() {
final Message msg = ProtonHelper.message();
msg.setMessageId("4711");
msg.setSubject(RegistrationConstants.ACTION_ASSERT);
msg.setBody(new AmqpValue(new JsonObject().put("temp", 15).encode()));
MessageHelper.annotate(msg, resource);
endpoint.processRequest(msg, resource, Constants.PRINCIPAL_ANONYMOUS);
verify(eventBus).send(eq(RegistrationConstants.EVENT_BUS_ADDRESS_REGISTRATION_IN), any(JsonObject.class), any(DeliveryOptions.class));
}
}
代码示例来源:origin: eclipse/hono
/**
* Verifies that the endpoint forwards a request message via the event bus.
*/
@Test
public void testProcessMessageSendsRequestViaEventBus() {
final Message msg = ProtonHelper.message();
msg.setMessageId("random-id");
msg.setSubject(TenantConstants.TenantAction.get.toString());
MessageHelper.addTenantId(msg, Constants.DEFAULT_TENANT);
MessageHelper.annotate(msg, resource);
endpoint.processRequest(msg, resource, Constants.PRINCIPAL_ANONYMOUS);
verify(eventBus).send(eq(TenantConstants.EVENT_BUS_ADDRESS_TENANT_IN), any(JsonObject.class), any(DeliveryOptions.class));
}
}
代码示例来源:origin: eclipse/hono
private static Message givenAValidMessageWithoutBody(final CredentialsConstants.CredentialsAction action) {
final Message msg = ProtonHelper.message();
msg.setMessageId("msg");
msg.setReplyTo("reply");
msg.setSubject(action.toString());
return msg;
}
}
代码示例来源:origin: eclipse/hono
private Message givenAMessageHavingProperties(final TenantConstants.TenantAction action) {
final Message msg = ProtonHelper.message();
msg.setMessageId("msg");
msg.setReplyTo("reply");
msg.setSubject(action.toString());
return msg;
}
}
代码示例来源:origin: eclipse/hono
/**
* Verifies that the endpoint rejects request messages for operations the client
* is not authorized to invoke.
*/
@Test
public void testHandleMessageRejectsUnauthorizedRequests() {
final Message msg = ProtonHelper.message();
msg.setSubject("unauthorized");
msg.setReplyTo(REPLY_RESOURCE.toString());
final ProtonConnection con = mock(ProtonConnection.class);
final ProtonDelivery delivery = mock(ProtonDelivery.class);
final AuthorizationService authService = mock(AuthorizationService.class);
when(authService.isAuthorized(any(HonoUser.class), any(ResourceIdentifier.class), anyString())).thenReturn(Future.succeededFuture(Boolean.FALSE));
final Future<Void> processingTracker = Future.future();
final RequestResponseEndpoint<ServiceConfigProperties> endpoint = getEndpoint(true, processingTracker);
endpoint.setAuthorizationService(authService);
endpoint.onLinkAttach(con, sender, REPLY_RESOURCE);
// WHEN a request for an operation is received that the client is not authorized to invoke
endpoint.handleMessage(con, receiver, resource, delivery, msg);
// THEN the the message is rejected
final ArgumentCaptor<DeliveryState> deliveryState = ArgumentCaptor.forClass(DeliveryState.class);
verify(delivery).disposition(deliveryState.capture(), booleanThat(is(Boolean.TRUE)));
assertThat(deliveryState.getValue(), instanceOf(Rejected.class));
verify(receiver, never()).close();
verify(authService).isAuthorized(Constants.PRINCIPAL_ANONYMOUS, resource, "unauthorized");
assertFalse(processingTracker.isComplete());
}
代码示例来源:origin: eclipse/hono
private static Message givenAMessageHavingProperties(final String deviceId, final String action) {
final Message msg = ProtonHelper.message();
msg.setMessageId("msg-id");
msg.setReplyTo("reply");
msg.setSubject(action);
if (deviceId != null) {
MessageHelper.addDeviceId(msg, deviceId);
}
return msg;
}
}
代码示例来源:origin: EnMasseProject/enmasse
/**
* Return a raw AMQP message
*
* @return
*/
public Message toAmqp() {
Message message = ProtonHelper.message();
message.setSubject(AMQP_SUBJECT);
Map<Symbol, Object> map = new HashMap<>();
map.put(Symbol.valueOf(AMQP_RETAIN_ANNOTATION), this.isRetain);
map.put(Symbol.valueOf(AMQP_QOS_ANNOTATION), this.qos.value());
MessageAnnotations messageAnnotations = new MessageAnnotations(map);
message.setMessageAnnotations(messageAnnotations);
message.setAddress(this.topic);
Header header = new Header();
header.setDurable(this.qos != MqttQoS.AT_MOST_ONCE);
message.setHeader(header);
// the payload could be null (or empty)
if (this.payload != null)
message.setBody(new Data(new Binary(this.payload.getBytes())));
return message;
}
代码示例来源:origin: EnMasseProject/enmasse
/**
* Return a raw AMQP message
*
* @return
*/
public Message toAmqp() {
Message message = ProtonHelper.message();
message.setSubject(AMQP_SUBJECT);
Map<Symbol, Object> map = new HashMap<>();
map.put(Symbol.valueOf(AMQP_RETAIN_ANNOTATION), this.isRetain);
map.put(Symbol.valueOf(AMQP_QOS_ANNOTATION), this.qos.value());
MessageAnnotations messageAnnotations = new MessageAnnotations(map);
message.setMessageAnnotations(messageAnnotations);
message.setAddress(this.topic);
Header header = new Header();
header.setDurable(this.qos != MqttQoS.AT_MOST_ONCE);
message.setHeader(header);
// the payload could be null (or empty)
if (this.payload != null)
message.setBody(new Data(new Binary(this.payload.getBytes())));
return message;
}
代码示例来源:origin: eclipse/hono
/**
* Verifies that the endpoint forwards a request message via the event bus.
*/
@Test
public void testProcessMessageSendsRequestViaEventBus() {
final Message msg = ProtonHelper.message();
msg.setMessageId("random-id");
msg.setSubject(CredentialsConstants.CredentialsAction.add.toString());
MessageHelper.addDeviceId(msg, "4711");
MessageHelper.addTenantId(msg, Constants.DEFAULT_TENANT);
MessageHelper.annotate(msg, resource);
msg.setBody(new AmqpValue(new JsonObject().put("temp", 15).encode()));
endpoint.processRequest(msg, resource, Constants.PRINCIPAL_ANONYMOUS);
verify(eventBus).send(eq(CredentialsConstants.EVENT_BUS_ADDRESS_CREDENTIALS_IN), any(JsonObject.class), any(DeliveryOptions.class));
}
}
代码示例来源:origin: org.eclipse.hono/hono-client
/**
* {@inheritDoc}
*/
@Override
public Future<Void> sendOneWayCommand(final String command, final String contentType, final Buffer data, final Map<String, Object> properties) {
Objects.requireNonNull(command);
final Span currentSpan = newChildSpan(null, command);
if (sender.isOpen()) {
final Future<BufferResult> responseTracker = Future.future();
final Message request = ProtonHelper.message();
AbstractHonoClient.setApplicationProperties(request, properties);
final String messageId = createMessageId();
request.setMessageId(messageId);
request.setSubject(command);
MessageHelper.setPayload(request, contentType, data);
sendRequest(request, responseTracker.completer(), null, currentSpan);
return responseTracker.map(ignore -> null);
} else {
TracingHelper.logError(currentSpan, "sender link is not open");
return Future.failedFuture(new ServerErrorException(
HttpURLConnection.HTTP_UNAVAILABLE, "sender link is not open"));
}
}
代码示例来源:origin: org.apache.beam/beam-sdks-java-io-amqp
@Test
public void encodeDecodeLargeMessage() throws Exception {
Message message = Message.Factory.create();
message.setAddress("address");
message.setSubject("subject");
String body = Joiner.on("").join(Collections.nCopies(32 * 1024 * 1024, " "));
message.setBody(new AmqpValue(body));
AmqpMessageCoder coder = AmqpMessageCoder.of();
Message clone = CoderUtils.clone(coder, message);
clone.getBody().toString().equals(message.getBody().toString());
}
}
代码示例来源:origin: org.apache.beam/beam-sdks-java-io-amqp
@Test
public void encodeDecodeTooMuchLargerMessage() throws Exception {
thrown.expect(CoderException.class);
Message message = Message.Factory.create();
message.setAddress("address");
message.setSubject("subject");
String body = Joiner.on("").join(Collections.nCopies(64 * 1024 * 1024, " "));
message.setBody(new AmqpValue(body));
AmqpMessageCoder coder = AmqpMessageCoder.of();
byte[] encoded = CoderUtils.encodeToByteArray(coder, message);
}
代码示例来源:origin: org.apache.beam/beam-sdks-java-io-amqp
@Test
public void encodeDecode() throws Exception {
Message message = Message.Factory.create();
message.setBody(new AmqpValue("body"));
message.setAddress("address");
message.setSubject("test");
AmqpMessageCoder coder = AmqpMessageCoder.of();
Message clone = CoderUtils.clone(coder, message);
assertEquals("AmqpValue{body}", clone.getBody().toString());
assertEquals("address", clone.getAddress());
assertEquals("test", clone.getSubject());
}
内容来源于网络,如有侵权,请联系作者删除!