本文整理了Java中org.springframework.amqp.core.Message.getMessageProperties()
方法的一些代码示例,展示了Message.getMessageProperties()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Message.getMessageProperties()
方法的具体详情如下:
包路径:org.springframework.amqp.core.Message
类名称:Message
方法名:getMessageProperties
暂无
代码示例来源:origin: macrozheng/mall
@Override
public Message postProcessMessage(Message message) throws AmqpException {
//给消息设置延迟毫秒值
message.getMessageProperties().setExpiration(String.valueOf(delayTimes));
return message;
}
});
代码示例来源:origin: jmdhappy/xxpay-master
public Message postProcessMessage(Message message) throws AmqpException {
message.getMessageProperties().setDelay((int) delay);
return message;
}
});
代码示例来源:origin: qiurunze123/miaosha
@RabbitListener(queues=MQConfig.MIAOSHATEST)
public void receiveMiaoShaMessage(Message message, Channel channel) throws IOException {
log.info("接受到的消息为:{}",message);
String messRegister = new String(message.getBody(), "UTF-8");
channel.basicAck(message.getMessageProperties().getDeliveryTag(), true);
MiaoShaMessageVo msm = RedisService.stringToBean(messRegister, MiaoShaMessageVo.class);
messageService.insertMs(msm);
}
}
代码示例来源:origin: openzipkin/brave
TraceContextOrSamplingFlags extractAndClearHeaders(Message message) {
MessageProperties messageProperties = message.getMessageProperties();
TraceContextOrSamplingFlags extracted = extractor.extract(messageProperties);
Map<String, Object> headers = messageProperties.getHeaders();
clearHeaders(headers);
return extracted;
}
代码示例来源:origin: yu199195/myth
/**
* Message container simple message listener container.
*
* @return the simple message listener container
*/
@Bean
@ConditionalOnProperty(prefix = "spring.rabbitmq", name = "host")
public SimpleMessageListenerContainer messageContainer() {
SimpleMessageListenerContainer container =
new SimpleMessageListenerContainer(connectionFactory);
container.setQueues(queue());
container.setExposeListenerChannel(true);
container.setMaxConcurrentConsumers(2);
container.setConcurrentConsumers(1);
//设置确认模式手工确认
container.setAcknowledgeMode(AcknowledgeMode.MANUAL);
container.setMessageListener((ChannelAwareMessageListener) (message, channel) -> {
byte[] messageBody = message.getBody();
LOGGER.debug("motan 框架接收到的消息");
//确认消息成功消费
final Boolean success = mythMqReceiveService.processMessage(messageBody);
if (success) {
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}
});
return container;
}
代码示例来源:origin: yu199195/myth
@Bean
@ConditionalOnProperty(prefix = "spring.rabbitmq", name = "host")
public SimpleMessageListenerContainer messageContainer() {
SimpleMessageListenerContainer container =
new SimpleMessageListenerContainer(connectionFactory);
container.setQueues(queue());
container.setExposeListenerChannel(true);
container.setMaxConcurrentConsumers(1);
container.setConcurrentConsumers(1);
//设置确认模式手工确认
container.setAcknowledgeMode(AcknowledgeMode.MANUAL);
container.setMessageListener((ChannelAwareMessageListener) (message, channel) -> {
byte[] messageBody = message.getBody();
//确认消息成功消费
final Boolean success = mythMqReceiveService.processMessage(messageBody);
if (success) {
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}
});
return container;
}
代码示例来源:origin: yu199195/myth
/**
* Message container simple message listener container.
*
* @return the simple message listener container
*/
@Bean
@ConditionalOnProperty(prefix = "spring.rabbitmq", name = "host")
public SimpleMessageListenerContainer messageContainer() {
SimpleMessageListenerContainer container =
new SimpleMessageListenerContainer(connectionFactory);
container.setQueues(queue());
container.setExposeListenerChannel(true);
container.setMaxConcurrentConsumers(3);
container.setConcurrentConsumers(1);
//设置确认模式手工确认
container.setAcknowledgeMode(AcknowledgeMode.MANUAL);
container.setMessageListener((ChannelAwareMessageListener) (message, channel) -> {
byte[] messageBody = message.getBody();
LogUtil.debug(LOGGER,()->"springcloud account服务 amqp接收消息");
//确认消息成功消费
final Boolean success = mythMqReceiveService.processMessage(messageBody);
if (success) {
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}
});
return container;
}
代码示例来源:origin: openzipkin/brave
/**
* MethodInterceptor for {@link SimpleMessageListenerContainer.ContainerDelegate#invokeListener(Channel,
* Message)}
*/
@Override public Object invoke(MethodInvocation methodInvocation) throws Throwable {
Message message = (Message) methodInvocation.getArguments()[1];
TraceContextOrSamplingFlags extracted = springRabbitTracing.extractAndClearHeaders(message);
// named for BlockingQueueConsumer.nextMessage, which we can't currently see
Span consumerSpan = tracer.nextSpan(extracted);
Span listenerSpan = tracer.newChild(consumerSpan.context());
if (!consumerSpan.isNoop()) {
setConsumerSpan(consumerSpan, message.getMessageProperties());
// incur timestamp overhead only once
long timestamp = tracing.clock(consumerSpan.context()).currentTimeMicroseconds();
consumerSpan.start(timestamp);
long consumerFinish = timestamp + 1L; // save a clock reading
consumerSpan.finish(consumerFinish);
// not using scoped span as we want to start with a pre-configured time
listenerSpan.name("on-message").start(consumerFinish);
}
try (SpanInScope ws = tracer.withSpanInScope(listenerSpan)) {
return methodInvocation.proceed();
} catch (Throwable t) {
listenerSpan.error(t);
throw t;
} finally {
listenerSpan.finish();
}
}
代码示例来源:origin: vector4wang/spring-boot-quick
@Override
public Message postProcessMessage(Message message) throws AmqpException {
message.getMessageProperties()
.setExpiration(ttl.toString()); // 设置per-message的失效时间
return message;
}
代码示例来源:origin: openzipkin/brave
@Override public Message postProcessMessage(Message message) {
TraceContext maybeParent = currentTraceContext.get();
// Unlike message consumers, we try current span before trying extraction. This is the proper
// order because the span in scope should take precedence over a potentially stale header entry.
//
// NOTE: Brave instrumentation used properly does not result in stale header entries, as we
// always clear message headers after reading.
Span span;
if (maybeParent == null) {
span = tracer.nextSpan(springRabbitTracing.extractAndClearHeaders(message));
} else {
// If we have a span in scope assume headers were cleared before
span = tracer.newChild(maybeParent);
}
if (!span.isNoop()) {
span.kind(Span.Kind.PRODUCER).name("publish");
if (remoteServiceName != null) span.remoteServiceName(remoteServiceName);
// incur timestamp overhead only once
long timestamp = tracing.clock(span.context()).currentTimeMicroseconds();
span.start(timestamp).finish(timestamp);
}
injector.inject(span.context(), message.getMessageProperties());
return message;
}
}
代码示例来源:origin: FlowCI/flow-platform
public PriorityMessage(Message message) {
super(message.getBody(), message.getMessageProperties());
this.timestamp = System.nanoTime();
}
代码示例来源:origin: spring-projects/spring-integration
protected void addDelayProperty(Message<?> message, org.springframework.amqp.core.Message amqpMessage) {
if (this.delayGenerator != null) {
amqpMessage.getMessageProperties().setDelay(this.delayGenerator.processMessage(message));
}
}
代码示例来源:origin: vector4wang/spring-boot-quick
@Override
public Message postProcessMessage(Message message) throws AmqpException {
message.getMessageProperties().setExpiration(msg.getTtl() + "");
return message;
}
});
代码示例来源:origin: spring-projects/spring-integration
private org.springframework.messaging.Message<Object> createMessage(Message message, Channel channel) {
Object payload = AmqpInboundChannelAdapter.this.messageConverter.fromMessage(message);
Map<String, Object> headers = AmqpInboundChannelAdapter.this.headerMapper
.toHeadersFromRequest(message.getMessageProperties());
if (AmqpInboundChannelAdapter.this.messageListenerContainer.getAcknowledgeMode()
== AcknowledgeMode.MANUAL) {
headers.put(AmqpHeaders.DELIVERY_TAG, message.getMessageProperties().getDeliveryTag());
headers.put(AmqpHeaders.CHANNEL, channel);
}
if (AmqpInboundChannelAdapter.this.retryTemplate != null) {
headers.put(IntegrationMessageHeaderAccessor.DELIVERY_ATTEMPT, new AtomicInteger());
}
final org.springframework.messaging.Message<Object> messagingMessage = getMessageBuilderFactory()
.withPayload(payload)
.copyHeaders(headers)
.build();
return messagingMessage;
}
代码示例来源:origin: spring-projects/spring-integration
protected Message<Object> buildMessage(org.springframework.amqp.core.Message message, Object converted) {
AbstractIntegrationMessageBuilder<Object> messageBuilder =
this.messageBuilderFactory.withPayload(converted);
if (this.channel.isExtractPayload()) {
Map<String, Object> headers =
this.inboundHeaderMapper.toHeadersFromRequest(message.getMessageProperties());
messageBuilder.copyHeaders(headers);
}
return messageBuilder.build();
}
代码示例来源:origin: spring-projects/spring-integration
private void send(String exchangeName, String routingKey,
final Message<?> requestMessage, CorrelationData correlationData) {
if (this.amqpTemplate instanceof RabbitTemplate) {
MessageConverter converter = ((RabbitTemplate) this.amqpTemplate).getMessageConverter();
org.springframework.amqp.core.Message amqpMessage = MappingUtils.mapMessage(requestMessage, converter,
getHeaderMapper(), getDefaultDeliveryMode(), isHeadersMappedLast());
addDelayProperty(requestMessage, amqpMessage);
((RabbitTemplate) this.amqpTemplate).send(exchangeName, routingKey, amqpMessage, correlationData);
}
else {
this.amqpTemplate.convertAndSend(exchangeName, routingKey, requestMessage.getPayload(),
message -> {
getHeaderMapper().fromHeadersToRequest(requestMessage.getHeaders(),
message.getMessageProperties());
return message;
});
}
}
代码示例来源:origin: spring-projects/spring-integration
protected AbstractIntegrationMessageBuilder<?> buildReply(MessageConverter converter,
org.springframework.amqp.core.Message amqpReplyMessage) {
Object replyObject = converter.fromMessage(amqpReplyMessage);
AbstractIntegrationMessageBuilder<?> builder = (replyObject instanceof Message)
? this.getMessageBuilderFactory().fromMessage((Message<?>) replyObject)
: this.getMessageBuilderFactory().withPayload(replyObject);
Map<String, ?> headers = getHeaderMapper().toHeadersFromReply(amqpReplyMessage.getMessageProperties());
builder.copyHeadersIfAbsent(headers);
return builder;
}
代码示例来源:origin: spring-projects/spring-integration
@Test
public void testAsyncDelayExpression() {
ConnectionFactory connectionFactory = mock(ConnectionFactory.class);
AsyncRabbitTemplate amqpTemplate = spy(new AsyncRabbitTemplate(new RabbitTemplate(connectionFactory),
new SimpleMessageListenerContainer(connectionFactory), "replyTo"));
amqpTemplate.setTaskScheduler(mock(TaskScheduler.class));
AsyncAmqpOutboundGateway gateway = new AsyncAmqpOutboundGateway(amqpTemplate);
willAnswer(
invocation -> amqpTemplate.new RabbitMessageFuture("foo", invocation.getArgument(2)))
.given(amqpTemplate).sendAndReceive(anyString(), anyString(), any(Message.class));
gateway.setExchangeName("foo");
gateway.setRoutingKey("bar");
gateway.setDelayExpressionString("42");
gateway.setBeanFactory(mock(BeanFactory.class));
gateway.setOutputChannel(new NullChannel());
gateway.afterPropertiesSet();
gateway.start();
ArgumentCaptor<Message> captor = ArgumentCaptor.forClass(Message.class);
gateway.handleMessage(new GenericMessage<>("foo"));
verify(amqpTemplate).sendAndReceive(eq("foo"), eq("bar"), captor.capture());
assertThat(captor.getValue().getMessageProperties().getDelay(), equalTo(42));
}
代码示例来源:origin: spring-projects/spring-integration
@Test
public void testHeaderMapperWinsAdapter() {
ConnectionFactory connectionFactory = mock(ConnectionFactory.class);
RabbitTemplate amqpTemplate = spy(new RabbitTemplate(connectionFactory));
AmqpOutboundEndpoint endpoint = new AmqpOutboundEndpoint(amqpTemplate);
endpoint.setHeadersMappedLast(true);
final AtomicReference<Message> amqpMessage =
new AtomicReference<Message>();
willAnswer(invocation -> {
amqpMessage.set(invocation.getArgument(2));
return null;
}).given(amqpTemplate).send(isNull(), isNull(), any(Message.class), isNull());
org.springframework.messaging.Message<?> message = MessageBuilder.withPayload("foo")
.setHeader(MessageHeaders.CONTENT_TYPE, "bar")
.build();
endpoint.handleMessage(message);
assertNotNull(amqpMessage.get());
assertEquals("bar", amqpMessage.get().getMessageProperties().getContentType());
}
代码示例来源:origin: spring-projects/spring-integration
protected Message<?> buildReturnedMessage(org.springframework.amqp.core.Message message,
int replyCode, String replyText, String exchange, String routingKey, MessageConverter converter) {
Object returnedObject = converter.fromMessage(message);
AbstractIntegrationMessageBuilder<?> builder = (returnedObject instanceof Message)
? this.getMessageBuilderFactory().fromMessage((Message<?>) returnedObject)
: this.getMessageBuilderFactory().withPayload(returnedObject);
Map<String, ?> headers = getHeaderMapper().toHeadersFromReply(message.getMessageProperties());
if (this.errorMessageStrategy == null) {
builder.copyHeadersIfAbsent(headers)
.setHeader(AmqpHeaders.RETURN_REPLY_CODE, replyCode)
.setHeader(AmqpHeaders.RETURN_REPLY_TEXT, replyText)
.setHeader(AmqpHeaders.RETURN_EXCHANGE, exchange)
.setHeader(AmqpHeaders.RETURN_ROUTING_KEY, routingKey);
}
Message<?> returnedMessage = builder.build();
if (this.errorMessageStrategy != null) {
returnedMessage = this.errorMessageStrategy.buildErrorMessage(new ReturnedAmqpMessageException(
returnedMessage, message, replyCode, replyText, exchange, routingKey), null);
}
return returnedMessage;
}
内容来源于网络,如有侵权,请联系作者删除!