org.apache.activemq.artemis.api.core.Message类的使用及代码示例

x33g5p2x  于2022-01-25 转载在 其他  
字(12.7k)|赞(0)|评价(0)|浏览(286)

本文整理了Java中org.apache.activemq.artemis.api.core.Message类的一些代码示例,展示了Message类的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Message类的具体详情如下:
包路径:org.apache.activemq.artemis.api.core.Message
类名称:Message

Message介绍

[英]A Message is a routable instance that has a payload.

The payload (the "body") is opaque to the messaging system. A Message also has a fixed set of headers (required by the messaging system) and properties (defined by the users) that can be used by the messaging system to route the message (e.g. to ensure it matches a queue filter).

Message Properties

Message can contain properties specified by the users. It is possible to convert from some types to other types as specified by the following table:

|        | boolean byte short int long float double String byte[] 
|---------------------------------------------------------------- 
|boolean |    X                                      X 
|byte    |          X    X    X   X                  X 
|short   |               X    X   X                  X 
|int     |                    X   X                  X 
|long    |                        X                  X 
|float   |                              X     X      X 
|double  |                                    X      X 
|String  |    X     X    X    X   X     X     X      X 
|byte[]  |                                                   X 
|-----------------------------------------------------------------

If conversion is not allowed (for example calling getFloatProperty on a property set a boolean), a ActiveMQPropertyConversionException will be thrown. User cases that will be covered by Message Receiving a buffer: Message encode = new CoreMessage(); // or any other implementation encode.receiveBuffer(buffer); Sending to a buffer: Message encode; size = encode.getEncodeSize(); encode.encodeDirectly(bufferOutput);
[中]消息是具有有效负载的可路由实例。
有效载荷(“主体”)对消息传递系统是不透明的。消息还具有一组固定的头(消息传递系统需要)和属性(由用户定义),消息传递系统可以使用这些头和属性路由消息(例如,确保消息与队列筛选器匹配)。
消息属性
消息可以包含用户指定的属性。可以从某些类型转换为下表指定的其他类型:

|        | boolean byte short int long float double String byte[] 
|---------------------------------------------------------------- 
|boolean |    X                                      X 
|byte    |          X    X    X   X                  X 
|short   |               X    X   X                  X 
|int     |                    X   X                  X 
|long    |                        X                  X 
|float   |                              X     X      X 
|double  |                                    X      X 
|String  |    X     X    X    X   X     X     X      X 
|byte[]  |                                                   X 
|-----------------------------------------------------------------

如果不允许转换(例如,将属性集上的getFloatProperty调用为布尔值),将抛出ActiveMQPropertyConversionException。接收缓冲区的消息将涵盖的用户案例:Message encode=new CoreMessage();//或任何其他实现编码。接收缓冲区(缓冲区);发送到缓冲区:消息编码;大小=编码。getEncodeSize();编码直接编码(缓冲输出);

代码示例

代码示例来源:origin: wildfly/wildfly

/**
* @return Returns the message in Map form, useful when encoding to JSON
*/
default Map<String, Object> toMap() {
 Map map = toPropertyMap();
 map.put("messageID", getMessageID());
 Object userID = getUserID();
 if (getUserID() != null) {
   map.put("userID", "ID:" + userID.toString());
 }
 map.put("address", getAddress() == null ? "" : getAddress());
 map.put("durable", isDurable());
 map.put("expiration", getExpiration());
 map.put("timestamp", getTimestamp());
 map.put("priority", (int)getPriority());
 return map;
}

代码示例来源:origin: wildfly/wildfly

public static void setJMSType(Message message, String type) {
 message.putStringProperty(TYPE_HEADER_NAME, new SimpleString(type));
}

代码示例来源:origin: wildfly/wildfly

/**
* Used by ActiveMQ Artemis management service.
*/
public static Object[] retrieveOperationParameters(final Message message) throws Exception {
 SimpleString sstring = message.toCore().getReadOnlyBodyBuffer().readNullableSimpleString();
 String jsonString = (sstring == null) ? null : sstring.toString();
 if (jsonString != null) {
   JsonArray jsonArray = JsonUtil.readJsonArray(jsonString);
   return JsonUtil.fromJsonArray(jsonArray);
 } else {
   return null;
 }
}

代码示例来源:origin: wildfly/wildfly

public static void setJMSReplyTo(Message message, final SimpleString dest) {
 if (dest == null) {
   message.removeProperty(REPLYTO_HEADER_NAME);
 } else {
   message.putStringProperty(REPLYTO_HEADER_NAME, dest);
 }
}

代码示例来源:origin: wildfly/wildfly

public static void setJMSCorrelationID(Message message, final String correlationID) {
 if (correlationID == null) {
   message.removeProperty(CORRELATIONID_HEADER_NAME);
 } else {
   message.putStringProperty(CORRELATIONID_HEADER_NAME, new SimpleString(correlationID));
 }
}

代码示例来源:origin: wildfly/wildfly

default void referenceOriginalMessage(final Message original, String originalQueue) {
 String queueOnMessage = original.getAnnotationString(Message.HDR_ORIGINAL_QUEUE);
 if (queueOnMessage != null) {
   setAnnotation(Message.HDR_ORIGINAL_QUEUE, queueOnMessage);
 } else if (originalQueue != null) {
   setAnnotation(Message.HDR_ORIGINAL_QUEUE, originalQueue);
 }
 Object originalID = original.getAnnotation(Message.HDR_ORIG_MESSAGE_ID);
 if (originalID != null) {
   setAnnotation(Message.HDR_ORIGINAL_ADDRESS, original.getAnnotationString(Message.HDR_ORIGINAL_ADDRESS));
   setAnnotation(Message.HDR_ORIG_MESSAGE_ID, originalID);
 } else {
   setAnnotation(Message.HDR_ORIGINAL_ADDRESS, original.getAddress());
   setAnnotation(Message.HDR_ORIG_MESSAGE_ID, original.getMessageID());
 }
 // reset expiry
 setExpiration(0);
}

代码示例来源:origin: apache/activemq-artemis

ActiveMQServer server = createServer(true, isNetty(), storeType);
server.start();
SimpleString ADDRESS_EXPIRY = ADDRESS.concat("-expiry");
server.getAddressSettingsRepository().addMatch("*", addressSettings);
session = sf.createSession(false, false, false);
session.createQueue(ADDRESS, ADDRESS, true);
session.createQueue(ADDRESS_EXPIRY, ADDRESS_EXPIRY, true);
ClientProducer producer = session.createProducer(ADDRESS);
clientFile.setExpiration(System.currentTimeMillis());
 Assert.assertEquals(ActiveMQTestBase.getSamplebyte(i), msg1.getBodyBuffer().readByte());
session.close();
server.stop();
session = sf.createSession(false, false, false);
session.start();
 Assert.assertEquals(ActiveMQTestBase.getSamplebyte(i), msg1.getBodyBuffer().readByte());

代码示例来源:origin: apache/activemq-artemis

connectors.put(server1tc.getName(), server1tc);
server0.getConfiguration().setConnectorConfigurations(connectors);
server0.getConfiguration().setBridgeConfigurations(bridgeConfigs);
server0.getConfiguration().setQueueConfigurations(queueConfigs0);
server0.start();
ClientSessionFactory sf0 = locator.createSessionFactory(server0tc);
ClientSession session0 = sf0.createSession(false, true, true);
ClientProducer producer0 = session0.createProducer(new SimpleString(testAddress));
final SimpleString propKey = new SimpleString("testkey");
final SimpleString selectorKey = new SimpleString("animal");
  ClientMessage message = session0.createMessage(true);
  message.getBodyBuffer().writeBytes(new byte[1024]);
   ids[i] = iterator.next().getMessage().getMessageID();
ClientSession session1 = sf1.createSession(false, true, true);

代码示例来源:origin: apache/activemq-artemis

@Test
public void testNoCursors() throws Exception {
 Configuration config = createDefaultInVMConfig().setJournalSyncNonTransactional(false);
 server = createServer(true, config, PagingTest.PAGE_SIZE, PagingTest.PAGE_MAX);
 server.start();
 ServerLocator locator = createInVMNonHALocator();
 ClientSessionFactory sf = locator.createSessionFactory();
 ClientSession session = sf.createSession();
 session.createQueue(ADDRESS, ADDRESS, true);
 ClientProducer prod = session.createProducer(ADDRESS);
 for (int i = 0; i < 100; i++) {
   Message msg = session.createMessage(true);
   msg.toCore().getBodyBuffer().writeBytes(new byte[1024]);
   prod.send(msg);
 }
 session.commit();
 session.deleteQueue(ADDRESS);
 session.close();
 sf.close();
 locator.close();
 server.stop();
 server.start();
 waitForNotPaging(server.getPagingManager().getPageStore(ADDRESS));
 server.stop();
}

代码示例来源:origin: apache/activemq-artemis

ActiveMQServer server = createServer(true, isNetty(), storeType);
server.start();
session = sf.createSession(true, true, 0);
session.createQueue(ADDRESS, ADDRESS, true);
ClientProducer producer = session.createProducer(ADDRESS);
   clientFile.putBytesProperty(Message.HDR_BRIDGE_DUPLICATE_ID, someDuplicateInfo.getBytes());
 } else {
   clientFile.putBytesProperty(Message.HDR_DUPLICATE_DETECTION_ID, someDuplicateInfo.getBytes());
ClientConsumer consumer = session.createConsumer(ADDRESS);
session.start();
 assertEquals(getSamplebyte(i), msg.getBodyBuffer().readByte());

代码示例来源:origin: wildfly/wildfly

@Override
  public Message decode(ActiveMQBuffer buffer, Message record) {
   // the caller must consume the first byte already, as that will be used to decide what persister (protocol) to use
   long id = buffer.readLong();
   SimpleString address = buffer.readNullableSimpleString();
   record = new CoreMessage();
   record.reloadPersistence(buffer);
   record.setMessageID(id);
   record.setAddress(address);
   return record;
  }
}

代码示例来源:origin: apache/activemq-artemis

ClientSession session = addClientSession(sf.createSession(false, false));
ClientProducer producer = addClientProducer(session.createProducer(addressName));
  Message message = session.createMessage(false);
  if (i % 2 == 0)
   message.putStringProperty(ClusterTestBase.FILTER_PROP, new SimpleString("0"));
  else
   message.putStringProperty(ClusterTestBase.FILTER_PROP, new SimpleString("1"));
  producer.send(message);
session.commit();
servers[0].stop();

代码示例来源:origin: wildfly/wildfly

/** Sub classes must add the first short as the protocol-id */
@Override
public void encode(ActiveMQBuffer buffer, Message record) {
 buffer.writeByte((byte)1);
 buffer.writeLong(record.getMessageID());
 buffer.writeNullableSimpleString(record.getAddressSimpleString());
 record.persist(buffer);
}

代码示例来源:origin: wildfly/wildfly

@Override
public void copyHeadersAndProperties(final Message msg) {
 messageID = msg.getMessageID();
 address = msg.getAddressSimpleString();
 userID = (UUID) msg.getUserID();
 type = msg.toCore().getType();
 durable = msg.isDurable();
 expiration = msg.getExpiration();
 timestamp = msg.getTimestamp();
 priority = msg.getPriority();
 if (msg instanceof CoreMessage) {
   properties = ((CoreMessage) msg).getTypedProperties();
 }
}

代码示例来源:origin: apache/activemq-artemis

@Override
public Message transform(final Message messageParameter) {
 ICoreMessage message = messageParameter.toCore();
 SimpleString oldProp = (SimpleString) message.getObjectProperty(new SimpleString("wibble"));
 if (!oldProp.equals(new SimpleString("bing"))) {
   throw new IllegalStateException("Wrong property value!!");
 }
 // Change a property
 message.putStringProperty(new SimpleString("wibble"), new SimpleString("bong"));
 // Change the body
 ActiveMQBuffer buffer = message.getBodyBuffer();
 buffer.readerIndex(0);
 String str = buffer.readString();
 if (!str.equals("doo be doo be doo be doo")) {
   throw new IllegalStateException("Wrong body!!");
 }
 buffer.clear();
 buffer.writeString("dee be dee be dee be dee");
 return message;
}

代码示例来源:origin: apache/activemq-artemis

Queue queue = ref.getQueue();
    long queueID;
    String queueName = queue.getName().toString();
      queueID = createQueueWithRoutingTypeIfNecessaryAndGetID(queueCreateSession, queue, message.getAddressSimpleString(), message.getRoutingType());
    Queue queue = ref.getQueue();
    long queueID;
    String queueName = queue.getName().toString();
      queueID = createQueueWithRoutingTypeIfNecessaryAndGetID(queueCreateSession, queue, message.getAddressSimpleString(), message.getRoutingType());
ClientProducer producer = session.createProducer();
for (Map.Entry<Message, Pair<List<Long>, List<Long>>> entry : queuesToSendTo.entrySet()) {
 List<Long> ids = entry.getValue().getA();
 message.putBytesProperty(Message.HDR_ROUTE_TO_IDS.toString(), buffer.array());
 ids = entry.getValue().getB();
 if (ids.size() > 0) {
    buffer.putLong(id);
   message.putBytesProperty(Message.HDR_ROUTE_TO_ACK_IDS.toString(), buffer.array());
 producer.send(message.getAddressSimpleString().toString(), message);
session.end(xid, XAResource.TMSUCCESS);
session.prepare(xid);

代码示例来源:origin: wildfly/wildfly

public static Set<String> getPropertyNames(Message message) {
 HashSet<String> set = new HashSet<>();
 for (SimpleString propName : message.getPropertyNames()) {
   if (propName.equals(Message.HDR_GROUP_ID)) {
    set.add(MessageUtil.JMSXGROUPID);
   } else if (propName.equals(Message.HDR_VALIDATED_USER)) {
    set.add(MessageUtil.JMSXUSERID);
   } else if ((!propName.startsWith(JMS) || propName.startsWith(JMSX) || propName.startsWith(JMS_)) && !propName.startsWith(CONNECTION_ID_PROPERTY_NAME) && !propName.equals(Message.HDR_ROUTING_TYPE) && !propName.startsWith(Message.HDR_ROUTE_TO_IDS)) {
    set.add(propName.toString());
   }
 }
 set.add(JMSXDELIVERYCOUNT);
 return set;
}

代码示例来源:origin: apache/activemq-artemis

ActiveMQServer server = createServer(true, isNetty());
server.getConfiguration().getIncomingInterceptorClassNames().add(LargeMessageTestInterceptorIgnoreLastPacket.class.getName());
server.start();
session = sf.createSession(false, true, true);
session.createQueue(ADDRESS, ADDRESS, true);
ClientProducer producer = session.createProducer(ADDRESS);
clientFile.setExpiration(System.currentTimeMillis());
server.fail(false);

代码示例来源:origin: apache/activemq-artemis

ClientMessage managementMessage = managementSession.createMessage(false);
     ManagementHelper.putAttribute(managementMessage, ResourceNames.QUEUE + queue, "ID");
     managementSession.start();
     if (logger.isDebugEnabled()) {
      logger.debug("Requesting ID for: " + queue);
message.putBytesProperty(Message.HDR_ROUTE_TO_IDS, buffer.array());
try (ClientProducer producer = session.createProducer(destination)) {
  producer.send(message);

代码示例来源:origin: wildfly/wildfly

public static boolean propertyExists(Message message, String name) {
   return message.containsProperty(new SimpleString(name)) || name.equals(MessageUtil.JMSXDELIVERYCOUNT) ||
     (MessageUtil.JMSXGROUPID.equals(name) && message.containsProperty(Message.HDR_GROUP_ID)) ||
     (MessageUtil.JMSXUSERID.equals(name) && message.containsProperty(Message.HDR_VALIDATED_USER));
  }
}

相关文章