本文整理了Java中org.apache.activemq.command.Message
类的一些代码示例,展示了Message
类的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Message
类的具体详情如下:
包路径:org.apache.activemq.command.Message
类名称:Message
[英]Represents an ActiveMQ message
[中]表示ActiveMQ消息
代码示例来源:origin: apache/activemq
public static void doResend(final ConnectionContext context, Message originalMessage, ActiveMQDestination deadLetterDestination, boolean copy) throws Exception {
Message message = copy ? originalMessage.copy() : originalMessage;
message.setOriginalDestination(message.getDestination());
message.setOriginalTransactionId(message.getTransactionId());
message.setDestination(deadLetterDestination);
message.setTransactionId(null);
message.setMemoryUsage(null);
message.setRedeliveryCounter(0);
message.getMessageId().setDataLocator(null);
boolean originalFlowControl = context.isProducerFlowControl();
try {
context.setProducerFlowControl(false);
ProducerInfo info = new ProducerInfo();
ProducerState state = new ProducerState(info);
ProducerBrokerExchange producerExchange = new ProducerBrokerExchange();
producerExchange.setProducerState(state);
producerExchange.setMutable(true);
producerExchange.setConnectionContext(context);
context.getBroker().send(producerExchange, message);
} finally {
context.setProducerFlowControl(originalFlowControl);
}
}
代码示例来源:origin: apache/activemq
@Override
public void send(ProducerBrokerExchange producerExchange, Message message) throws Exception {
if (message.getTimestamp() > 0 && !isDestinationDLQ(message) &&
(processNetworkMessages || (message.getBrokerPath() == null || message.getBrokerPath().length == 0))) {
// timestamp not been disabled and has not passed through a network or processNetworkMessages=true
long oldExpiration = message.getExpiration();
long newTimeStamp = System.currentTimeMillis();
long timeToLive = zeroExpirationOverride;
long oldTimestamp = message.getTimestamp();
if (oldExpiration > 0) {
timeToLive = oldExpiration - oldTimestamp;
}
if (timeToLive > 0 && ttlCeiling > 0 && timeToLive > ttlCeiling) {
timeToLive = ttlCeiling;
}
long expiration = timeToLive + newTimeStamp;
// In the scenario that the Broker is behind the clients we never want to set the
// Timestamp and Expiration in the past
if(!futureOnly || (expiration > oldExpiration)) {
if (timeToLive > 0 && expiration > 0) {
message.setExpiration(expiration);
}
message.setTimestamp(newTimeStamp);
LOG.debug("Set message {} timestamp from {} to {}", new Object[]{ message.getMessageId(), oldTimestamp, newTimeStamp });
}
}
super.send(producerExchange, message);
}
代码示例来源:origin: apache/activemq
@Override
public boolean recoverMessage(long sequenceId, byte[] data) throws Exception {
Message msg = (Message)wireFormat.unmarshal(new ByteSequence(data));
msg.getMessageId().setBrokerSequenceId(sequenceId);
msg.getMessageId().setFutureOrSequenceLong(sequenceId);
msg.getMessageId().setEntryLocator(sequenceId);
listener.recoverMessage(msg);
trackLastRecovered(sequenceId, msg.getPriority());
return true;
}
代码示例来源:origin: apache/activemq
private Message copy(Message original, ActiveMQDestination target) {
Message msg = original.copy();
if (setOriginalDestination) {
msg.setDestination(target);
msg.setOriginalDestination(original.getDestination());
}
return msg;
}
代码示例来源:origin: apache/activemq
private boolean stampAsExpired(Message message) throws IOException {
boolean stamped = false;
if (message.getProperty(ORIGINAL_EXPIRATION) == null) {
long expiration = message.getExpiration();
message.setProperty(ORIGINAL_EXPIRATION, new Long(expiration));
stamped = true;
}
return stamped;
}
代码示例来源:origin: apache/activemq
@Override
public boolean isSendToDeadLetterQueue(Message message) {
boolean result = false;
LOG.debug("Discarding message sent to DLQ: {}, dest: {}", message.getMessageId(), message.getDestination());
return result;
}
}
代码示例来源:origin: apache/activemq
info.beforeMarshall(wireFormat);
looseMarshalCachedObject(wireFormat, (DataStructure)info.getProducerId(), dataOut);
looseMarshalCachedObject(wireFormat, (DataStructure)info.getDestination(), dataOut);
looseMarshalCachedObject(wireFormat, (DataStructure)info.getTransactionId(), dataOut);
looseMarshalCachedObject(wireFormat, (DataStructure)info.getOriginalDestination(), dataOut);
looseMarshalNestedObject(wireFormat, (DataStructure)info.getMessageId(), dataOut);
looseMarshalCachedObject(wireFormat, (DataStructure)info.getOriginalTransactionId(), dataOut);
looseMarshalString(info.getGroupID(), dataOut);
dataOut.writeInt(info.getGroupSequence());
looseMarshalString(info.getCorrelationId(), dataOut);
dataOut.writeBoolean(info.isPersistent());
looseMarshalLong(wireFormat, info.getExpiration(), dataOut);
dataOut.writeByte(info.getPriority());
looseMarshalNestedObject(wireFormat, (DataStructure)info.getReplyTo(), dataOut);
looseMarshalLong(wireFormat, info.getTimestamp(), dataOut);
looseMarshalString(info.getType(), dataOut);
looseMarshalByteSequence(wireFormat, info.getContent(), dataOut);
looseMarshalByteSequence(wireFormat, info.getMarshalledProperties(), dataOut);
looseMarshalNestedObject(wireFormat, (DataStructure)info.getDataStructure(), dataOut);
looseMarshalCachedObject(wireFormat, (DataStructure)info.getTargetConsumerId(), dataOut);
dataOut.writeBoolean(info.isCompressed());
dataOut.writeInt(info.getRedeliveryCounter());
looseMarshalObjectArray(wireFormat, info.getBrokerPath(), dataOut);
looseMarshalLong(wireFormat, info.getArrival(), dataOut);
looseMarshalString(info.getUserID(), dataOut);
dataOut.writeBoolean(info.isRecievedByDFBridge());
dataOut.writeBoolean(info.isDroppable());
代码示例来源:origin: apache/activemq
protected void sendScheduledJob(ConnectionContext context, Job job, ActiveMQDestination replyTo) throws Exception {
org.apache.activemq.util.ByteSequence packet = new org.apache.activemq.util.ByteSequence(job.getPayload());
try {
Message msg = (Message) this.wireFormat.unmarshal(packet);
msg.setOriginalTransactionId(null);
msg.setPersistent(false);
msg.setType(AdvisorySupport.ADIVSORY_MESSAGE_TYPE);
msg.setMessageId(new MessageId(this.producerId, this.messageIdGenerator.getNextSequenceId()));
msg.setOriginalDestination(msg.getDestination());
msg.setDestination(replyTo);
msg.setResponseRequired(false);
msg.setProducerId(this.producerId);
msg.setProperty("scheduledJobId", job.getJobId());
final boolean originalFlowControl = context.isProducerFlowControl();
final ProducerBrokerExchange producerExchange = new ProducerBrokerExchange();
producerExchange.setConnectionContext(context);
producerExchange.setProducerState(new ProducerState(new ProducerInfo()));
try {
context.setProducerFlowControl(false);
this.next.send(producerExchange, msg);
} finally {
context.setProducerFlowControl(originalFlowControl);
代码示例来源:origin: apache/activemq
if (!msg.isPersistent()) {
if (isSendAdvisoryIfNoConsumers()) {
if (destination.isQueue() || !AdvisorySupport.isAdvisoryTopic(destination)) {
Message message = msg.copy();
if (message.getOriginalDestination() != null) {
message.setOriginalDestination(message.getDestination());
if (message.getOriginalTransactionId() != null) {
message.setOriginalTransactionId(message.getTransactionId());
if (destination.isQueue()) {
advisoryTopic = AdvisorySupport.getNoQueueConsumersAdvisoryTopic(destination);
} else {
advisoryTopic = AdvisorySupport.getNoTopicConsumersAdvisoryTopic(destination);
message.setDestination(advisoryTopic);
message.setTransactionId(null);
代码示例来源:origin: apache/activemq
while ((messageDispatch = executor.dequeueNoWait()) != null) {
final MessageDispatch md = messageDispatch;
final ActiveMQMessage message = (ActiveMQMessage)md.getMessage();
earlyAck = new MessageAck(md, MessageAck.EXPIRED_ACK_TYPE, 1);
earlyAck.setFirstMessageId(message.getMessageId());
} else if (connection.isDuplicate(ActiveMQSession.this, message)) {
LOG.debug("{} got duplicate: {}", this, message.getMessageId());
earlyAck = new MessageAck(md, MessageAck.POSION_ACK_TYPE, 1);
earlyAck.setFirstMessageId(md.getMessage().getMessageId());
earlyAck.setPoisonCause(new Throwable("Duplicate delivery to " + this));
if (isClientAcknowledge()||isIndividualAcknowledge()) {
message.setAcknowledgeCallback(new Callback() {
@Override
md.setDeliverySequenceId(getNextDeliveryId());
lastDeliveredSequenceId = message.getMessageId().getBrokerSequenceId();
final MessageAck ack = new MessageAck(md, MessageAck.STANDARD_ACK_TYPE, 1);
ack.setFirstMessageId(md.getMessage().getMessageId());
doStartTransaction();
ack.setTransactionId(getTransactionContext().getTransactionId());
connection.onClientInternalException(e);
} finally {
if (ack.getTransactionId() == null) {
代码示例来源:origin: apache/activemq
protected Message configureMessage(MessageDispatch md) throws IOException {
Message message = md.getMessage().copy();
// Update the packet to show where it came from.
message.setBrokerPath(appendToBrokerPath(message.getBrokerPath(), localBrokerPath));
message.setProducerId(producerInfo.getProducerId());
message.setDestination(md.getDestination());
message.setMemoryUsage(null);
if (message.getOriginalTransactionId() == null) {
message.setOriginalTransactionId(message.getTransactionId());
}
message.setTransactionId(null);
if (configuration.isUseCompression()) {
message.compress();
}
return message;
}
代码示例来源:origin: apache/activemq
connection.rollbackDuplicate(ActiveMQSession.this, md.getMessage());
LOG.debug("No redelivery of {} on rollback of {} due to failover of {}", md, ack.getTransactionId(), connection.getTransport());
return;
if (ack.getTransactionId().isXATransaction() && !connection.hasDispatcher(ack.getConsumerId())) {
LOG.debug("No local redelivery of {} on rollback of {} because consumer is no longer active on {}", md, ack.getTransactionId(), connection.getTransport());
return;
RedeliveryPolicy redeliveryPolicy = connection.getRedeliveryPolicy();
int redeliveryCounter = md.getMessage().getRedeliveryCounter();
if (redeliveryPolicy.getMaximumRedeliveries() != RedeliveryPolicy.NO_MAXIMUM_REDELIVERIES
&& redeliveryCounter >= redeliveryPolicy.getMaximumRedeliveries()) {
MessageAck ack = new MessageAck(md, MessageAck.POSION_ACK_TYPE, 1);
ack.setFirstMessageId(md.getMessage().getMessageId());
ack.setPoisonCause(new Throwable("Exceeded ra redelivery policy limit:" + redeliveryPolicy));
asyncSendPacket(ack);
MessageAck ack = new MessageAck(md, MessageAck.REDELIVERED_ACK_TYPE, 1);
ack.setFirstMessageId(md.getMessage().getMessageId());
asyncSendPacket(ack);
md.getMessage().onMessageRolledBack();
代码示例来源:origin: apache/activemq
m = ((MessageDispatch)bc).getMessage();
sb.append(m.getMessageId());
sb.append(',');
sb.append(m.getCommandId());
ProducerId pid = m.getProducerId();
long sid = pid.getSessionId();
sb.append(',');
sb.append(pid.getValue());
sb.append(',');
sb.append(m.getCorrelationId());
sb.append(',');
sb.append(m.getType());
sb.append(((MessageDispatch)bc).getConsumerId());
MessageAck ma = (MessageAck)bc;
sb.append(" ConsumerID:");
sb.append(ma.getConsumerId());
sb.append(" ack:");
sb.append(ma.getFirstMessageId());
sb.append('-');
sb.append(ma.getLastMessageId());
代码示例来源:origin: apache/activemq
int match = sub.getActiveMQDestination().compareTo(next.getActiveMQDestination());
if (match == 0 || (!next.getActiveMQDestination().isPattern() && match == 1)) {
super.addSubscription(context, sub);
final RegionBroker regionBroker = (RegionBroker) context.getBroker().getAdaptor(RegionBroker.class);
final Set<Destination> virtualDests = regionBroker.getDestinations(virtualDestination);
if (virtualDest.getActiveMQDestination().isTopic() &&
(virtualDest.isAlwaysRetroactive() || sub.getConsumerInfo().isRetroactive())) {
final Message copy = message.copy();
copy.setOriginalDestination(message.getDestination());
copy.setDestination(newDestination);
if (regionDest == null) {
regionDest = getBaseDestination((Destination) regionBroker.getDestinations(newDestination).toArray()[0]);
copy.setRegionDestination(regionDest);
sub.addRecoveredMessage(context, newDestination.isQueue() ? new IndirectMessageReference(copy) : copy);
代码示例来源:origin: apache/activemq
message.setRegionDestination(this);
ProducerState state = producerExchange.getProducerState();
if (state == null) {
final boolean sendProducerAck = !message.isResponseRequired() && producerInfo.getWindowSize() > 0
&& !context.isInRecoveryMode();
if (message.isExpired()) {
broker.getRoot().messageExpired(context, message, null);
if (sendProducerAck) {
ProducerAck ack = new ProducerAck(producerInfo.getProducerId(), message.getSize());
context.getConnection().dispatchAsync(ack);
if (producerInfo.getWindowSize() > 0 || message.isResponseRequired()) {
messagesWaitingForSpace.put(message.getMessageId(), new Runnable() {
@Override
public void run() {
+ message.getProducerId() + ") stopped to prevent flooding "
if (message.isExpired()) {
LOG.debug("Expired message: {}", message);
broker.getRoot().messageExpired(context, message, null);
return;
ProducerAck ack = new ProducerAck(producerInfo.getProducerId(), message.getSize());
代码示例来源:origin: apache/activemq
@Override
public void updateMessage(Message message) throws IOException {
if (LOG.isTraceEnabled()) {
LOG.trace("updating: " + message.getMessageId() + " with deliveryCount: " + message.getRedeliveryCounter());
}
KahaUpdateMessageCommand updateMessageCommand = new KahaUpdateMessageCommand();
KahaAddMessageCommand command = new KahaAddMessageCommand();
command.setDestination(dest);
command.setMessageId(message.getMessageId().toProducerKey());
command.setPriority(message.getPriority());
command.setPrioritySupported(prioritizedMessages);
org.apache.activemq.util.ByteSequence packet = wireFormat.marshal(message);
command.setMessage(new Buffer(packet.getData(), packet.getOffset(), packet.getLength()));
updateMessageCommand.setMessage(command);
store(updateMessageCommand, isEnableJournalDiskSyncs(), null, null);
}
代码示例来源:origin: apache/activemq
ActiveMQDestination deadLetterDestination = deadLetterStrategy.getDeadLetterQueueFor(message, subscription);
if (deadLetterDestination.equals(message.getDestination())) {
LOG.debug("Not re-adding to DLQ: {}, dest: {}", message.getMessageId(), message.getDestination());
return false;
message = message.copy();
long dlqExpiration = deadLetterStrategy.getExpiration();
if (dlqExpiration > 0) {
stampAsExpired(message);
message.setExpiration(dlqExpiration);
if (!message.isPersistent()) {
message.setPersistent(true);
message.setProperty("originalDeliveryMode", "NON_PERSISTENT");
message.setProperty(ActiveMQMessage.DLQ_DELIVERY_FAILURE_CAUSE_PROPERTY,
poisonCause.toString());
if (context.getSecurityContext() == null || !context.getSecurityContext().isBrokerContext()) {
adminContext = BrokerSupport.getConnectionContext(this);
addDestination(adminContext, deadLetterDestination, false).getActiveMQDestination().setDLQ(true);
BrokerSupport.resendNoCopy(adminContext, message, deadLetterDestination);
return true;
LOG.debug("Dead Letter message with no DLQ strategy in place, message id: {}, destination: {}", message.getMessageId(), message.getDestination());
代码示例来源:origin: apache/activemq
networkBridgeStatistics.getEnqueues().increment();
final MessageDispatch md = (MessageDispatch) command;
final DemandSubscription sub = subscriptionMapByLocalId.get(md.getConsumerId());
if (sub != null && md.getMessage() != null && sub.incrementOutstandingResponses()) {
configuration.getBrokerName(), remoteBrokerName, Arrays.toString(md.getMessage().getBrokerPath()), md.getMessage()
});
localBroker.oneway(new MessageAck(md, MessageAck.INDIVIDUAL_ACK_TYPE, 1));
} finally {
sub.decrementOutstandingResponses();
configuration.getBrokerName(), remoteBrokerName, md.getConsumerId(), message.getDestination(), Arrays.toString(message.getBrokerPath()), (LOG.isTraceEnabled() ? message : message.getMessageId())
});
if (isDuplex() && NetworkBridgeFilter.isAdvisoryInterpretedByNetworkBridge(message)) {
if (isPermissableDestination(md.getDestination())) {
if (message.isPersistent() || configuration.isAlwaysSyncSend()) {
localBroker.oneway(new MessageAck(md, MessageAck.INDIVIDUAL_ACK_TYPE, 1));
networkBridgeStatistics.getDequeues().increment();
} finally {
代码示例来源:origin: apache/activemq
session.connection.rollbackDuplicate(this, md.getMessage());
final int currentRedeliveryCount = lastMd.getMessage().getRedeliveryCounter();
if (currentRedeliveryCount > 0) {
redeliveryDelay = redeliveryPolicy.getNextRedeliveryDelay(redeliveryDelay);
redeliveryDelay = redeliveryPolicy.getInitialRedeliveryDelay();
MessageId firstMsgId = deliveredMessages.getLast().getMessage().getMessageId();
md.getMessage().onMessageRolledBack();
session.connection.rollbackDuplicate(this, md.getMessage());
&& lastMd.getMessage().getRedeliveryCounter() > redeliveryPolicy.getMaximumRedeliveries()) {
MessageAck ack = new MessageAck(lastMd, MessageAck.POSION_ACK_TYPE, deliveredMessages.size());
ack.setFirstMessageId(firstMsgId);
ack.setPoisonCause(new Throwable("Delivery[" + lastMd.getMessage().getRedeliveryCounter() + "] exceeds redelivery policy limit:" + redeliveryPolicy
+ ", cause:" + lastMd.getRollbackCause(), lastMd.getRollbackCause()));
session.sendAck(ack,true);
MessageAck ack = new MessageAck(lastMd, MessageAck.REDELIVERED_ACK_TYPE, deliveredMessages.size());
ack.setFirstMessageId(firstMsgId);
session.sendAck(ack,true);
代码示例来源:origin: apache/activemq
/**
*
*/
public void send(ProducerBrokerExchange producerExchange, Message message) throws Exception {
ActiveMQDestination destination = message.getDestination();
if (destination.isComposite()) {
ActiveMQDestination[] destinations = destination.getCompositeDestinations();
for (int i = 0; i < destinations.length; i++) {
if (i != 0) {
message = message.copy();
message.setMemoryUsage(null);
}
message.setOriginalDestination(destination);
message.setDestination(destinations[i]);
next.send(producerExchange, message);
}
} else {
next.send(producerExchange, message);
}
}
内容来源于网络,如有侵权,请联系作者删除!