org.apache.activemq.command.Message类的使用及代码示例

x33g5p2x  于2022-01-25 转载在 其他  
字(17.1k)|赞(0)|评价(0)|浏览(176)

本文整理了Java中org.apache.activemq.command.Message类的一些代码示例,展示了Message类的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Message类的具体详情如下:
包路径:org.apache.activemq.command.Message
类名称:Message

Message介绍

[英]Represents an ActiveMQ message
[中]表示ActiveMQ消息

代码示例

代码示例来源:origin: apache/activemq

public static void doResend(final ConnectionContext context, Message originalMessage, ActiveMQDestination deadLetterDestination, boolean copy) throws Exception {
  Message message = copy ? originalMessage.copy() : originalMessage;
  message.setOriginalDestination(message.getDestination());
  message.setOriginalTransactionId(message.getTransactionId());
  message.setDestination(deadLetterDestination);
  message.setTransactionId(null);
  message.setMemoryUsage(null);
  message.setRedeliveryCounter(0);
  message.getMessageId().setDataLocator(null);
  boolean originalFlowControl = context.isProducerFlowControl();
  try {
    context.setProducerFlowControl(false);
    ProducerInfo info = new ProducerInfo();
    ProducerState state = new ProducerState(info);
    ProducerBrokerExchange producerExchange = new ProducerBrokerExchange();
    producerExchange.setProducerState(state);
    producerExchange.setMutable(true);
    producerExchange.setConnectionContext(context);
    context.getBroker().send(producerExchange, message);
  } finally {
    context.setProducerFlowControl(originalFlowControl);
  }
}

代码示例来源:origin: apache/activemq

@Override
public void send(ProducerBrokerExchange producerExchange, Message message) throws Exception {
  if (message.getTimestamp() > 0 && !isDestinationDLQ(message) &&
    (processNetworkMessages || (message.getBrokerPath() == null || message.getBrokerPath().length == 0))) {
    // timestamp not been disabled and has not passed through a network or processNetworkMessages=true
    long oldExpiration = message.getExpiration();
    long newTimeStamp = System.currentTimeMillis();
    long timeToLive = zeroExpirationOverride;
    long oldTimestamp = message.getTimestamp();
    if (oldExpiration > 0) {
      timeToLive = oldExpiration - oldTimestamp;
    }
    if (timeToLive > 0 && ttlCeiling > 0 && timeToLive > ttlCeiling) {
      timeToLive = ttlCeiling;
    }
    long expiration = timeToLive + newTimeStamp;
    // In the scenario that the Broker is behind the clients we never want to set the
    // Timestamp and Expiration in the past
    if(!futureOnly || (expiration > oldExpiration)) {
      if (timeToLive > 0 && expiration > 0) {
        message.setExpiration(expiration);
      }
      message.setTimestamp(newTimeStamp);
      LOG.debug("Set message {} timestamp from {} to {}", new Object[]{ message.getMessageId(), oldTimestamp, newTimeStamp });
    }
  }
  super.send(producerExchange, message);
}

代码示例来源:origin: apache/activemq

@Override
public boolean recoverMessage(long sequenceId, byte[] data) throws Exception {
    Message msg = (Message)wireFormat.unmarshal(new ByteSequence(data));
    msg.getMessageId().setBrokerSequenceId(sequenceId);
    msg.getMessageId().setFutureOrSequenceLong(sequenceId);
    msg.getMessageId().setEntryLocator(sequenceId);
    listener.recoverMessage(msg);
    trackLastRecovered(sequenceId, msg.getPriority());
    return true;
}

代码示例来源:origin: apache/activemq

private Message copy(Message original, ActiveMQDestination target) {
  Message msg = original.copy();
  if (setOriginalDestination) {
    msg.setDestination(target);
    msg.setOriginalDestination(original.getDestination());
  }
  return msg;
}

代码示例来源:origin: apache/activemq

private boolean stampAsExpired(Message message) throws IOException {
  boolean stamped = false;
  if (message.getProperty(ORIGINAL_EXPIRATION) == null) {
    long expiration = message.getExpiration();
    message.setProperty(ORIGINAL_EXPIRATION, new Long(expiration));
    stamped = true;
  }
  return stamped;
}

代码示例来源:origin: apache/activemq

@Override
  public boolean isSendToDeadLetterQueue(Message message) {
    boolean result = false;
    LOG.debug("Discarding message sent to DLQ: {}, dest: {}", message.getMessageId(), message.getDestination());
    return result;
  }
}

代码示例来源:origin: apache/activemq

info.beforeMarshall(wireFormat);
looseMarshalCachedObject(wireFormat, (DataStructure)info.getProducerId(), dataOut);
looseMarshalCachedObject(wireFormat, (DataStructure)info.getDestination(), dataOut);
looseMarshalCachedObject(wireFormat, (DataStructure)info.getTransactionId(), dataOut);
looseMarshalCachedObject(wireFormat, (DataStructure)info.getOriginalDestination(), dataOut);
looseMarshalNestedObject(wireFormat, (DataStructure)info.getMessageId(), dataOut);
looseMarshalCachedObject(wireFormat, (DataStructure)info.getOriginalTransactionId(), dataOut);
looseMarshalString(info.getGroupID(), dataOut);
dataOut.writeInt(info.getGroupSequence());
looseMarshalString(info.getCorrelationId(), dataOut);
dataOut.writeBoolean(info.isPersistent());
looseMarshalLong(wireFormat, info.getExpiration(), dataOut);
dataOut.writeByte(info.getPriority());
looseMarshalNestedObject(wireFormat, (DataStructure)info.getReplyTo(), dataOut);
looseMarshalLong(wireFormat, info.getTimestamp(), dataOut);
looseMarshalString(info.getType(), dataOut);
looseMarshalByteSequence(wireFormat, info.getContent(), dataOut);
looseMarshalByteSequence(wireFormat, info.getMarshalledProperties(), dataOut);
looseMarshalNestedObject(wireFormat, (DataStructure)info.getDataStructure(), dataOut);
looseMarshalCachedObject(wireFormat, (DataStructure)info.getTargetConsumerId(), dataOut);
dataOut.writeBoolean(info.isCompressed());
dataOut.writeInt(info.getRedeliveryCounter());
looseMarshalObjectArray(wireFormat, info.getBrokerPath(), dataOut);
looseMarshalLong(wireFormat, info.getArrival(), dataOut);
looseMarshalString(info.getUserID(), dataOut);
dataOut.writeBoolean(info.isRecievedByDFBridge());
dataOut.writeBoolean(info.isDroppable());

代码示例来源:origin: apache/activemq

protected void sendScheduledJob(ConnectionContext context, Job job, ActiveMQDestination replyTo) throws Exception {
  org.apache.activemq.util.ByteSequence packet = new org.apache.activemq.util.ByteSequence(job.getPayload());
  try {
    Message msg = (Message) this.wireFormat.unmarshal(packet);
    msg.setOriginalTransactionId(null);
    msg.setPersistent(false);
    msg.setType(AdvisorySupport.ADIVSORY_MESSAGE_TYPE);
    msg.setMessageId(new MessageId(this.producerId, this.messageIdGenerator.getNextSequenceId()));
    msg.setOriginalDestination(msg.getDestination());
    msg.setDestination(replyTo);
    msg.setResponseRequired(false);
    msg.setProducerId(this.producerId);
    msg.setProperty("scheduledJobId", job.getJobId());
    final boolean originalFlowControl = context.isProducerFlowControl();
    final ProducerBrokerExchange producerExchange = new ProducerBrokerExchange();
    producerExchange.setConnectionContext(context);
    producerExchange.setProducerState(new ProducerState(new ProducerInfo()));
    try {
      context.setProducerFlowControl(false);
      this.next.send(producerExchange, msg);
    } finally {
      context.setProducerFlowControl(originalFlowControl);

代码示例来源:origin: apache/activemq

if (!msg.isPersistent()) {
  if (isSendAdvisoryIfNoConsumers()) {
    if (destination.isQueue() || !AdvisorySupport.isAdvisoryTopic(destination)) {
      Message message = msg.copy();
      if (message.getOriginalDestination() != null) {
        message.setOriginalDestination(message.getDestination());
      if (message.getOriginalTransactionId() != null) {
        message.setOriginalTransactionId(message.getTransactionId());
      if (destination.isQueue()) {
        advisoryTopic = AdvisorySupport.getNoQueueConsumersAdvisoryTopic(destination);
      } else {
        advisoryTopic = AdvisorySupport.getNoTopicConsumersAdvisoryTopic(destination);
      message.setDestination(advisoryTopic);
      message.setTransactionId(null);

代码示例来源:origin: apache/activemq

while ((messageDispatch = executor.dequeueNoWait()) != null) {
  final MessageDispatch md = messageDispatch;
  final ActiveMQMessage message = (ActiveMQMessage)md.getMessage();
    earlyAck = new MessageAck(md, MessageAck.EXPIRED_ACK_TYPE, 1);
    earlyAck.setFirstMessageId(message.getMessageId());
  } else if (connection.isDuplicate(ActiveMQSession.this, message)) {
    LOG.debug("{} got duplicate: {}", this, message.getMessageId());
    earlyAck = new MessageAck(md, MessageAck.POSION_ACK_TYPE, 1);
    earlyAck.setFirstMessageId(md.getMessage().getMessageId());
    earlyAck.setPoisonCause(new Throwable("Duplicate delivery to " + this));
  if (isClientAcknowledge()||isIndividualAcknowledge()) {
    message.setAcknowledgeCallback(new Callback() {
      @Override
  md.setDeliverySequenceId(getNextDeliveryId());
  lastDeliveredSequenceId = message.getMessageId().getBrokerSequenceId();
  final MessageAck ack = new MessageAck(md, MessageAck.STANDARD_ACK_TYPE, 1);
      ack.setFirstMessageId(md.getMessage().getMessageId());
      doStartTransaction();
      ack.setTransactionId(getTransactionContext().getTransactionId());
      connection.onClientInternalException(e);
    } finally {
      if (ack.getTransactionId() == null) {

代码示例来源:origin: apache/activemq

protected Message configureMessage(MessageDispatch md) throws IOException {
  Message message = md.getMessage().copy();
  // Update the packet to show where it came from.
  message.setBrokerPath(appendToBrokerPath(message.getBrokerPath(), localBrokerPath));
  message.setProducerId(producerInfo.getProducerId());
  message.setDestination(md.getDestination());
  message.setMemoryUsage(null);
  if (message.getOriginalTransactionId() == null) {
    message.setOriginalTransactionId(message.getTransactionId());
  }
  message.setTransactionId(null);
  if (configuration.isUseCompression()) {
    message.compress();
  }
  return message;
}

代码示例来源:origin: apache/activemq

connection.rollbackDuplicate(ActiveMQSession.this, md.getMessage());
  LOG.debug("No redelivery of {} on rollback of {} due to failover of {}", md, ack.getTransactionId(), connection.getTransport());
  return;
if (ack.getTransactionId().isXATransaction() && !connection.hasDispatcher(ack.getConsumerId())) {
  LOG.debug("No local redelivery of {} on rollback of {} because consumer is no longer active on {}", md, ack.getTransactionId(), connection.getTransport());
  return;
RedeliveryPolicy redeliveryPolicy = connection.getRedeliveryPolicy();
int redeliveryCounter = md.getMessage().getRedeliveryCounter();
if (redeliveryPolicy.getMaximumRedeliveries() != RedeliveryPolicy.NO_MAXIMUM_REDELIVERIES
    && redeliveryCounter >= redeliveryPolicy.getMaximumRedeliveries()) {
  MessageAck ack = new MessageAck(md, MessageAck.POSION_ACK_TYPE, 1);
  ack.setFirstMessageId(md.getMessage().getMessageId());
  ack.setPoisonCause(new Throwable("Exceeded ra redelivery policy limit:" + redeliveryPolicy));
  asyncSendPacket(ack);
  MessageAck ack = new MessageAck(md, MessageAck.REDELIVERED_ACK_TYPE, 1);
  ack.setFirstMessageId(md.getMessage().getMessageId());
  asyncSendPacket(ack);
md.getMessage().onMessageRolledBack();

代码示例来源:origin: apache/activemq

m = ((MessageDispatch)bc).getMessage();   
sb.append(m.getMessageId());
sb.append(',');
sb.append(m.getCommandId());
ProducerId pid = m.getProducerId();
long sid = pid.getSessionId();
sb.append(',');
sb.append(pid.getValue());
sb.append(',');
sb.append(m.getCorrelationId());
sb.append(',');
sb.append(m.getType());
sb.append(((MessageDispatch)bc).getConsumerId());
MessageAck ma = (MessageAck)bc;
sb.append(" ConsumerID:");
sb.append(ma.getConsumerId());
sb.append(" ack:");
sb.append(ma.getFirstMessageId());
sb.append('-');
sb.append(ma.getLastMessageId());

代码示例来源:origin: apache/activemq

int match = sub.getActiveMQDestination().compareTo(next.getActiveMQDestination());
if (match == 0 || (!next.getActiveMQDestination().isPattern() && match == 1)) {
  super.addSubscription(context, sub);
  final RegionBroker regionBroker = (RegionBroker) context.getBroker().getAdaptor(RegionBroker.class);
  final Set<Destination> virtualDests = regionBroker.getDestinations(virtualDestination);
    if (virtualDest.getActiveMQDestination().isTopic() &&
        (virtualDest.isAlwaysRetroactive() || sub.getConsumerInfo().isRetroactive())) {
          final Message copy = message.copy();
          copy.setOriginalDestination(message.getDestination());
          copy.setDestination(newDestination);
          if (regionDest == null) {
            regionDest = getBaseDestination((Destination) regionBroker.getDestinations(newDestination).toArray()[0]);
          copy.setRegionDestination(regionDest);
          sub.addRecoveredMessage(context, newDestination.isQueue() ? new IndirectMessageReference(copy) : copy);

代码示例来源:origin: apache/activemq

message.setRegionDestination(this);
ProducerState state = producerExchange.getProducerState();
if (state == null) {
final boolean sendProducerAck = !message.isResponseRequired() && producerInfo.getWindowSize() > 0
    && !context.isInRecoveryMode();
if (message.isExpired()) {
  broker.getRoot().messageExpired(context, message, null);
  if (sendProducerAck) {
    ProducerAck ack = new ProducerAck(producerInfo.getProducerId(), message.getSize());
    context.getConnection().dispatchAsync(ack);
    if (producerInfo.getWindowSize() > 0 || message.isResponseRequired()) {
        messagesWaitingForSpace.put(message.getMessageId(), new Runnable() {
          @Override
          public void run() {
            + message.getProducerId() + ") stopped to prevent flooding "
      if (message.isExpired()) {
        LOG.debug("Expired message: {}", message);
        broker.getRoot().messageExpired(context, message, null);
        return;
  ProducerAck ack = new ProducerAck(producerInfo.getProducerId(), message.getSize());

代码示例来源:origin: apache/activemq

@Override
public void updateMessage(Message message) throws IOException {
  if (LOG.isTraceEnabled()) {
    LOG.trace("updating: " + message.getMessageId() + " with deliveryCount: " + message.getRedeliveryCounter());
  }
  KahaUpdateMessageCommand updateMessageCommand = new KahaUpdateMessageCommand();
  KahaAddMessageCommand command = new KahaAddMessageCommand();
  command.setDestination(dest);
  command.setMessageId(message.getMessageId().toProducerKey());
  command.setPriority(message.getPriority());
  command.setPrioritySupported(prioritizedMessages);
  org.apache.activemq.util.ByteSequence packet = wireFormat.marshal(message);
  command.setMessage(new Buffer(packet.getData(), packet.getOffset(), packet.getLength()));
  updateMessageCommand.setMessage(command);
  store(updateMessageCommand, isEnableJournalDiskSyncs(), null, null);
}

代码示例来源:origin: apache/activemq

ActiveMQDestination deadLetterDestination = deadLetterStrategy.getDeadLetterQueueFor(message, subscription);
  if (deadLetterDestination.equals(message.getDestination())) {
    LOG.debug("Not re-adding to DLQ: {}, dest: {}", message.getMessageId(), message.getDestination());
    return false;
  message = message.copy();
  long dlqExpiration = deadLetterStrategy.getExpiration();
  if (dlqExpiration > 0) {
    stampAsExpired(message);
  message.setExpiration(dlqExpiration);
  if (!message.isPersistent()) {
    message.setPersistent(true);
    message.setProperty("originalDeliveryMode", "NON_PERSISTENT");
    message.setProperty(ActiveMQMessage.DLQ_DELIVERY_FAILURE_CAUSE_PROPERTY,
        poisonCause.toString());
  if (context.getSecurityContext() == null || !context.getSecurityContext().isBrokerContext()) {
    adminContext = BrokerSupport.getConnectionContext(this);
  addDestination(adminContext, deadLetterDestination, false).getActiveMQDestination().setDLQ(true);
  BrokerSupport.resendNoCopy(adminContext, message, deadLetterDestination);
  return true;
LOG.debug("Dead Letter message with no DLQ strategy in place, message id: {}, destination: {}", message.getMessageId(), message.getDestination());

代码示例来源:origin: apache/activemq

networkBridgeStatistics.getEnqueues().increment();
final MessageDispatch md = (MessageDispatch) command;
final DemandSubscription sub = subscriptionMapByLocalId.get(md.getConsumerId());
if (sub != null && md.getMessage() != null && sub.incrementOutstandingResponses()) {
        configuration.getBrokerName(), remoteBrokerName, Arrays.toString(md.getMessage().getBrokerPath()), md.getMessage()
    });
      localBroker.oneway(new MessageAck(md, MessageAck.INDIVIDUAL_ACK_TYPE, 1));
    } finally {
      sub.decrementOutstandingResponses();
      configuration.getBrokerName(), remoteBrokerName, md.getConsumerId(), message.getDestination(), Arrays.toString(message.getBrokerPath()), (LOG.isTraceEnabled() ? message : message.getMessageId())
  });
  if (isDuplex() && NetworkBridgeFilter.isAdvisoryInterpretedByNetworkBridge(message)) {
  if (isPermissableDestination(md.getDestination())) {
    if (message.isPersistent() || configuration.isAlwaysSyncSend()) {
       localBroker.oneway(new MessageAck(md, MessageAck.INDIVIDUAL_ACK_TYPE, 1));
       networkBridgeStatistics.getDequeues().increment();
     } finally {

代码示例来源:origin: apache/activemq

session.connection.rollbackDuplicate(this, md.getMessage());
final int currentRedeliveryCount = lastMd.getMessage().getRedeliveryCounter();
if (currentRedeliveryCount > 0) {
  redeliveryDelay = redeliveryPolicy.getNextRedeliveryDelay(redeliveryDelay);
  redeliveryDelay = redeliveryPolicy.getInitialRedeliveryDelay();
MessageId firstMsgId = deliveredMessages.getLast().getMessage().getMessageId();
  md.getMessage().onMessageRolledBack();
  session.connection.rollbackDuplicate(this, md.getMessage());
  && lastMd.getMessage().getRedeliveryCounter() > redeliveryPolicy.getMaximumRedeliveries()) {
  MessageAck ack = new MessageAck(lastMd, MessageAck.POSION_ACK_TYPE, deliveredMessages.size());
  ack.setFirstMessageId(firstMsgId);
  ack.setPoisonCause(new Throwable("Delivery[" + lastMd.getMessage().getRedeliveryCounter()  + "] exceeds redelivery policy limit:" + redeliveryPolicy
      + ", cause:" + lastMd.getRollbackCause(), lastMd.getRollbackCause()));
  session.sendAck(ack,true);
    MessageAck ack = new MessageAck(lastMd, MessageAck.REDELIVERED_ACK_TYPE, deliveredMessages.size());
    ack.setFirstMessageId(firstMsgId);
    session.sendAck(ack,true);

代码示例来源:origin: apache/activemq

/**
 * 
 */
public void send(ProducerBrokerExchange producerExchange, Message message) throws Exception {
  ActiveMQDestination destination = message.getDestination();
  if (destination.isComposite()) {
    ActiveMQDestination[] destinations = destination.getCompositeDestinations();
    for (int i = 0; i < destinations.length; i++) {
      if (i != 0) {
        message = message.copy();
        message.setMemoryUsage(null);
      }
      message.setOriginalDestination(destination);
      message.setDestination(destinations[i]);
      next.send(producerExchange, message);
    }
  } else {
    next.send(producerExchange, message);
  }
}

相关文章

Message类方法