org.springframework.amqp.core.Message.getMessageProperties()方法的使用及代码示例

x33g5p2x  于2022-01-25 转载在 其他  
字(13.2k)|赞(0)|评价(0)|浏览(149)

本文整理了Java中org.springframework.amqp.core.Message.getMessageProperties()方法的一些代码示例,展示了Message.getMessageProperties()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Message.getMessageProperties()方法的具体详情如下:
包路径:org.springframework.amqp.core.Message
类名称:Message
方法名:getMessageProperties

Message.getMessageProperties介绍

暂无

代码示例

代码示例来源:origin: macrozheng/mall

@Override
  public Message postProcessMessage(Message message) throws AmqpException {
    //给消息设置延迟毫秒值
    message.getMessageProperties().setExpiration(String.valueOf(delayTimes));
    return message;
  }
});

代码示例来源:origin: jmdhappy/xxpay-master

public Message postProcessMessage(Message message) throws AmqpException {
    message.getMessageProperties().setDelay((int) delay);
    return message;
  }
});

代码示例来源:origin: qiurunze123/miaosha

@RabbitListener(queues=MQConfig.MIAOSHATEST)
  public void receiveMiaoShaMessage(Message message, Channel channel) throws IOException {
    log.info("接受到的消息为:{}",message);
    String messRegister = new String(message.getBody(), "UTF-8");
    channel.basicAck(message.getMessageProperties().getDeliveryTag(), true);
    MiaoShaMessageVo msm  = RedisService.stringToBean(messRegister, MiaoShaMessageVo.class);
    messageService.insertMs(msm);
    }
}

代码示例来源:origin: openzipkin/brave

TraceContextOrSamplingFlags extractAndClearHeaders(Message message) {
 MessageProperties messageProperties = message.getMessageProperties();
 TraceContextOrSamplingFlags extracted = extractor.extract(messageProperties);
 Map<String, Object> headers = messageProperties.getHeaders();
 clearHeaders(headers);
 return extracted;
}

代码示例来源:origin: yu199195/myth

/**
 * Message container simple message listener container.
 *
 * @return the simple message listener container
 */
@Bean
@ConditionalOnProperty(prefix = "spring.rabbitmq", name = "host")
public SimpleMessageListenerContainer messageContainer() {
  SimpleMessageListenerContainer container =
      new SimpleMessageListenerContainer(connectionFactory);
  container.setQueues(queue());
  container.setExposeListenerChannel(true);
  container.setMaxConcurrentConsumers(2);
  container.setConcurrentConsumers(1);
  //设置确认模式手工确认
  container.setAcknowledgeMode(AcknowledgeMode.MANUAL);
  container.setMessageListener((ChannelAwareMessageListener) (message, channel) -> {
    byte[] messageBody = message.getBody();
    LOGGER.debug("motan 框架接收到的消息");
    //确认消息成功消费
    final Boolean success = mythMqReceiveService.processMessage(messageBody);
    if (success) {
      channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
    }
  });
  return container;
}

代码示例来源:origin: yu199195/myth

@Bean
@ConditionalOnProperty(prefix = "spring.rabbitmq", name = "host")
public SimpleMessageListenerContainer messageContainer() {
  SimpleMessageListenerContainer container =
      new SimpleMessageListenerContainer(connectionFactory);
  container.setQueues(queue());
  container.setExposeListenerChannel(true);
  container.setMaxConcurrentConsumers(1);
  container.setConcurrentConsumers(1);
  //设置确认模式手工确认
  container.setAcknowledgeMode(AcknowledgeMode.MANUAL);
  container.setMessageListener((ChannelAwareMessageListener) (message, channel) -> {
    byte[] messageBody = message.getBody();
    //确认消息成功消费
    final Boolean success = mythMqReceiveService.processMessage(messageBody);
    if (success) {
      channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
    }
  });
  return container;
}

代码示例来源:origin: yu199195/myth

/**
 * Message container simple message listener container.
 *
 * @return the simple message listener container
 */
@Bean
@ConditionalOnProperty(prefix = "spring.rabbitmq", name = "host")
public SimpleMessageListenerContainer messageContainer() {
  SimpleMessageListenerContainer container =
      new SimpleMessageListenerContainer(connectionFactory);
  container.setQueues(queue());
  container.setExposeListenerChannel(true);
  container.setMaxConcurrentConsumers(3);
  container.setConcurrentConsumers(1);
  //设置确认模式手工确认
  container.setAcknowledgeMode(AcknowledgeMode.MANUAL);
  container.setMessageListener((ChannelAwareMessageListener) (message, channel) -> {
    byte[] messageBody = message.getBody();
    LogUtil.debug(LOGGER,()->"springcloud  account服务  amqp接收消息");
    //确认消息成功消费
    final Boolean success = mythMqReceiveService.processMessage(messageBody);
    if (success) {
      channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
    }
  });
  return container;
}

代码示例来源:origin: openzipkin/brave

/**
 * MethodInterceptor for {@link SimpleMessageListenerContainer.ContainerDelegate#invokeListener(Channel,
 * Message)}
 */
@Override public Object invoke(MethodInvocation methodInvocation) throws Throwable {
 Message message = (Message) methodInvocation.getArguments()[1];
 TraceContextOrSamplingFlags extracted = springRabbitTracing.extractAndClearHeaders(message);
 // named for BlockingQueueConsumer.nextMessage, which we can't currently see
 Span consumerSpan = tracer.nextSpan(extracted);
 Span listenerSpan = tracer.newChild(consumerSpan.context());
 if (!consumerSpan.isNoop()) {
  setConsumerSpan(consumerSpan, message.getMessageProperties());
  // incur timestamp overhead only once
  long timestamp = tracing.clock(consumerSpan.context()).currentTimeMicroseconds();
  consumerSpan.start(timestamp);
  long consumerFinish = timestamp + 1L; // save a clock reading
  consumerSpan.finish(consumerFinish);
  // not using scoped span as we want to start with a pre-configured time
  listenerSpan.name("on-message").start(consumerFinish);
 }
 try (SpanInScope ws = tracer.withSpanInScope(listenerSpan)) {
  return methodInvocation.proceed();
 } catch (Throwable t) {
  listenerSpan.error(t);
  throw t;
 } finally {
  listenerSpan.finish();
 }
}

代码示例来源:origin: vector4wang/spring-boot-quick

@Override
public Message postProcessMessage(Message message) throws AmqpException {
  message.getMessageProperties()
      .setExpiration(ttl.toString()); // 设置per-message的失效时间
  return message;
}

代码示例来源:origin: openzipkin/brave

@Override public Message postProcessMessage(Message message) {
  TraceContext maybeParent = currentTraceContext.get();
  // Unlike message consumers, we try current span before trying extraction. This is the proper
  // order because the span in scope should take precedence over a potentially stale header entry.
  //
  // NOTE: Brave instrumentation used properly does not result in stale header entries, as we
  // always clear message headers after reading.
  Span span;
  if (maybeParent == null) {
   span = tracer.nextSpan(springRabbitTracing.extractAndClearHeaders(message));
  } else {
   // If we have a span in scope assume headers were cleared before
   span = tracer.newChild(maybeParent);
  }

  if (!span.isNoop()) {
   span.kind(Span.Kind.PRODUCER).name("publish");
   if (remoteServiceName != null) span.remoteServiceName(remoteServiceName);
   // incur timestamp overhead only once
   long timestamp = tracing.clock(span.context()).currentTimeMicroseconds();
   span.start(timestamp).finish(timestamp);
  }

  injector.inject(span.context(), message.getMessageProperties());
  return message;
 }
}

代码示例来源:origin: FlowCI/flow-platform

public PriorityMessage(Message message) {
  super(message.getBody(), message.getMessageProperties());
  this.timestamp = System.nanoTime();
}

代码示例来源:origin: spring-projects/spring-integration

protected void addDelayProperty(Message<?> message, org.springframework.amqp.core.Message amqpMessage) {
  if (this.delayGenerator != null) {
    amqpMessage.getMessageProperties().setDelay(this.delayGenerator.processMessage(message));
  }
}

代码示例来源:origin: vector4wang/spring-boot-quick

@Override
  public Message postProcessMessage(Message message) throws AmqpException {
    message.getMessageProperties().setExpiration(msg.getTtl() + "");
    return message;
  }
});

代码示例来源:origin: spring-projects/spring-integration

private org.springframework.messaging.Message<Object> createMessage(Message message, Channel channel) {
  Object payload = AmqpInboundChannelAdapter.this.messageConverter.fromMessage(message);
  Map<String, Object> headers = AmqpInboundChannelAdapter.this.headerMapper
      .toHeadersFromRequest(message.getMessageProperties());
  if (AmqpInboundChannelAdapter.this.messageListenerContainer.getAcknowledgeMode()
      == AcknowledgeMode.MANUAL) {
    headers.put(AmqpHeaders.DELIVERY_TAG, message.getMessageProperties().getDeliveryTag());
    headers.put(AmqpHeaders.CHANNEL, channel);
  }
  if (AmqpInboundChannelAdapter.this.retryTemplate != null) {
    headers.put(IntegrationMessageHeaderAccessor.DELIVERY_ATTEMPT, new AtomicInteger());
  }
  final org.springframework.messaging.Message<Object> messagingMessage = getMessageBuilderFactory()
      .withPayload(payload)
      .copyHeaders(headers)
      .build();
  return messagingMessage;
}

代码示例来源:origin: spring-projects/spring-integration

protected Message<Object> buildMessage(org.springframework.amqp.core.Message message, Object converted) {
  AbstractIntegrationMessageBuilder<Object> messageBuilder =
      this.messageBuilderFactory.withPayload(converted);
  if (this.channel.isExtractPayload()) {
    Map<String, Object> headers =
        this.inboundHeaderMapper.toHeadersFromRequest(message.getMessageProperties());
    messageBuilder.copyHeaders(headers);
  }
  return messageBuilder.build();
}

代码示例来源:origin: spring-projects/spring-integration

private void send(String exchangeName, String routingKey,
    final Message<?> requestMessage, CorrelationData correlationData) {
  if (this.amqpTemplate instanceof RabbitTemplate) {
    MessageConverter converter = ((RabbitTemplate) this.amqpTemplate).getMessageConverter();
    org.springframework.amqp.core.Message amqpMessage = MappingUtils.mapMessage(requestMessage, converter,
        getHeaderMapper(), getDefaultDeliveryMode(), isHeadersMappedLast());
    addDelayProperty(requestMessage, amqpMessage);
    ((RabbitTemplate) this.amqpTemplate).send(exchangeName, routingKey, amqpMessage, correlationData);
  }
  else {
    this.amqpTemplate.convertAndSend(exchangeName, routingKey, requestMessage.getPayload(),
        message -> {
          getHeaderMapper().fromHeadersToRequest(requestMessage.getHeaders(),
              message.getMessageProperties());
          return message;
        });
  }
}

代码示例来源:origin: spring-projects/spring-integration

protected AbstractIntegrationMessageBuilder<?> buildReply(MessageConverter converter,
    org.springframework.amqp.core.Message amqpReplyMessage) {
  Object replyObject = converter.fromMessage(amqpReplyMessage);
  AbstractIntegrationMessageBuilder<?> builder = (replyObject instanceof Message)
      ? this.getMessageBuilderFactory().fromMessage((Message<?>) replyObject)
      : this.getMessageBuilderFactory().withPayload(replyObject);
  Map<String, ?> headers = getHeaderMapper().toHeadersFromReply(amqpReplyMessage.getMessageProperties());
  builder.copyHeadersIfAbsent(headers);
  return builder;
}

代码示例来源:origin: spring-projects/spring-integration

@Test
public void testAsyncDelayExpression() {
  ConnectionFactory connectionFactory = mock(ConnectionFactory.class);
  AsyncRabbitTemplate amqpTemplate = spy(new AsyncRabbitTemplate(new RabbitTemplate(connectionFactory),
      new SimpleMessageListenerContainer(connectionFactory), "replyTo"));
  amqpTemplate.setTaskScheduler(mock(TaskScheduler.class));
  AsyncAmqpOutboundGateway gateway = new AsyncAmqpOutboundGateway(amqpTemplate);
  willAnswer(
      invocation -> amqpTemplate.new RabbitMessageFuture("foo", invocation.getArgument(2)))
        .given(amqpTemplate).sendAndReceive(anyString(), anyString(), any(Message.class));
  gateway.setExchangeName("foo");
  gateway.setRoutingKey("bar");
  gateway.setDelayExpressionString("42");
  gateway.setBeanFactory(mock(BeanFactory.class));
  gateway.setOutputChannel(new NullChannel());
  gateway.afterPropertiesSet();
  gateway.start();
  ArgumentCaptor<Message> captor = ArgumentCaptor.forClass(Message.class);
  gateway.handleMessage(new GenericMessage<>("foo"));
  verify(amqpTemplate).sendAndReceive(eq("foo"), eq("bar"), captor.capture());
  assertThat(captor.getValue().getMessageProperties().getDelay(), equalTo(42));
}

代码示例来源:origin: spring-projects/spring-integration

@Test
public void testHeaderMapperWinsAdapter() {
  ConnectionFactory connectionFactory = mock(ConnectionFactory.class);
  RabbitTemplate amqpTemplate = spy(new RabbitTemplate(connectionFactory));
  AmqpOutboundEndpoint endpoint = new AmqpOutboundEndpoint(amqpTemplate);
  endpoint.setHeadersMappedLast(true);
  final AtomicReference<Message> amqpMessage =
      new AtomicReference<Message>();
  willAnswer(invocation -> {
    amqpMessage.set(invocation.getArgument(2));
    return null;
  }).given(amqpTemplate).send(isNull(), isNull(), any(Message.class), isNull());
  org.springframework.messaging.Message<?> message = MessageBuilder.withPayload("foo")
      .setHeader(MessageHeaders.CONTENT_TYPE, "bar")
      .build();
  endpoint.handleMessage(message);
  assertNotNull(amqpMessage.get());
  assertEquals("bar", amqpMessage.get().getMessageProperties().getContentType());
}

代码示例来源:origin: spring-projects/spring-integration

protected Message<?> buildReturnedMessage(org.springframework.amqp.core.Message message,
    int replyCode, String replyText, String exchange, String routingKey, MessageConverter converter) {
  Object returnedObject = converter.fromMessage(message);
  AbstractIntegrationMessageBuilder<?> builder = (returnedObject instanceof Message)
      ? this.getMessageBuilderFactory().fromMessage((Message<?>) returnedObject)
      : this.getMessageBuilderFactory().withPayload(returnedObject);
  Map<String, ?> headers = getHeaderMapper().toHeadersFromReply(message.getMessageProperties());
  if (this.errorMessageStrategy == null) {
    builder.copyHeadersIfAbsent(headers)
        .setHeader(AmqpHeaders.RETURN_REPLY_CODE, replyCode)
        .setHeader(AmqpHeaders.RETURN_REPLY_TEXT, replyText)
        .setHeader(AmqpHeaders.RETURN_EXCHANGE, exchange)
        .setHeader(AmqpHeaders.RETURN_ROUTING_KEY, routingKey);
  }
  Message<?> returnedMessage = builder.build();
  if (this.errorMessageStrategy != null) {
    returnedMessage = this.errorMessageStrategy.buildErrorMessage(new ReturnedAmqpMessageException(
        returnedMessage, message, replyCode, replyText, exchange, routingKey), null);
  }
  return returnedMessage;
}

相关文章