org.apache.qpid.proton.message.Message.setMessageAnnotations()方法的使用及代码示例

x33g5p2x  于2022-01-25 转载在 其他  
字(9.9k)|赞(0)|评价(0)|浏览(129)

本文整理了Java中org.apache.qpid.proton.message.Message.setMessageAnnotations()方法的一些代码示例,展示了Message.setMessageAnnotations()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Message.setMessageAnnotations()方法的具体详情如下:
包路径:org.apache.qpid.proton.message.Message
类名称:Message
方法名:setMessageAnnotations

Message.setMessageAnnotations介绍

暂无

代码示例

代码示例来源:origin: apache/activemq-artemis

private void lazyCreateMessageAnnotations() {
 if (messageAnnotationsMap == null) {
   messageAnnotationsMap = new HashMap<>();
   message.setMessageAnnotations(new MessageAnnotations(messageAnnotationsMap));
 }
}

代码示例来源:origin: org.eclipse.hono/hono-core

/**
 * Adds a value for a symbol to an AMQP 1.0 message's <em>annotations</em>.
 *
 * @param msg the message to add the symbol to.
 * @param key the name of the symbol to add a value for.
 * @param value the value to add.
 */
public static void addAnnotation(final Message msg, final String key, final Object value) {
  MessageAnnotations annotations = msg.getMessageAnnotations();
  if (annotations == null) {
    annotations = new MessageAnnotations(new HashMap<>());
    msg.setMessageAnnotations(annotations);
  }
  annotations.getValue().put(Symbol.getSymbol(key), value);
}

代码示例来源:origin: Azure/azure-event-hubs-java

Message toAmqpMessage(final String partitionKey) {
  final Message amqpMessage = this.toAmqpMessage();
  final MessageAnnotations messageAnnotations = (amqpMessage.getMessageAnnotations() == null)
      ? new MessageAnnotations(new HashMap<>())
      : amqpMessage.getMessageAnnotations();
  messageAnnotations.getValue().put(AmqpConstants.PARTITION_KEY, partitionKey);
  amqpMessage.setMessageAnnotations(messageAnnotations);
  return amqpMessage;
}

代码示例来源:origin: eclipse/hono

/**
 * Adds a value for a symbol to an AMQP 1.0 message's <em>annotations</em>.
 *
 * @param msg the message to add the symbol to.
 * @param key the name of the symbol to add a value for.
 * @param value the value to add.
 */
public static void addAnnotation(final Message msg, final String key, final Object value) {
  MessageAnnotations annotations = msg.getMessageAnnotations();
  if (annotations == null) {
    annotations = new MessageAnnotations(new HashMap<>());
    msg.setMessageAnnotations(annotations);
  }
  annotations.getValue().put(Symbol.getSymbol(key), value);
}

代码示例来源:origin: strimzi/strimzi-kafka-bridge

@Override
public Message toMessage(String address, KafkaConsumerRecord<String, byte[]> record) {
  
  Message message = Proton.message();
  message.setAddress(address);
  
  message.decode(record.value(), 0, record.value().length);
  
  // put message annotations about partition, offset and key (if not null)
  Map<Symbol, Object> map = new HashMap<>();
  map.put(Symbol.valueOf(AmqpBridge.AMQP_PARTITION_ANNOTATION), record.partition());
  map.put(Symbol.valueOf(AmqpBridge.AMQP_OFFSET_ANNOTATION), record.offset());
  map.put(Symbol.valueOf(AmqpBridge.AMQP_KEY_ANNOTATION), record.key());
  map.put(Symbol.valueOf(AmqpBridge.AMQP_TOPIC_ANNOTATION), record.topic());
  
  MessageAnnotations messageAnnotations = new MessageAnnotations(map);
  message.setMessageAnnotations(messageAnnotations);
  
  return message;
}

代码示例来源:origin: EnMasseProject/enmasse

private void forwardMessage(ProtonSender protonSender, ProtonReceiver protonReceiver, ProtonDelivery sourceDelivery, Message message) {
  MessageAnnotations annotations = message.getMessageAnnotations();
  if (annotations == null) {
    annotations = new MessageAnnotations(Collections.singletonMap(replicated, true));
  } else {
    annotations.getValue().put(replicated, true);
  }
  message.setMessageAnnotations(annotations);
  protonSender.send(message, protonDelivery -> {
    sourceDelivery.disposition(protonDelivery.getRemoteState(), protonDelivery.remotelySettled());
    protonReceiver.flow(protonSender.getCredit() - protonReceiver.getCredit());
  });
}

代码示例来源:origin: strimzi/strimzi-kafka-bridge

message.setMessageAnnotations(messageAnnotations);

代码示例来源:origin: strimzi/strimzi-kafka-bridge

@Override
public Message toMessage(String address, KafkaConsumerRecord<String, byte[]> record) {
  
  Message message = Proton.message();
  message.setAddress(address);
  
  // put message annotations about partition, offset and key (if not null)
  Map<Symbol, Object> map = new HashMap<>();
  map.put(Symbol.valueOf(AmqpBridge.AMQP_PARTITION_ANNOTATION), record.partition());
  map.put(Symbol.valueOf(AmqpBridge.AMQP_OFFSET_ANNOTATION), record.offset());
  map.put(Symbol.valueOf(AmqpBridge.AMQP_KEY_ANNOTATION), record.key());
  map.put(Symbol.valueOf(AmqpBridge.AMQP_TOPIC_ANNOTATION), record.topic());
  
  MessageAnnotations messageAnnotations = new MessageAnnotations(map);
  message.setMessageAnnotations(messageAnnotations);
  
  message.setBody(new Data(new Binary(record.value())));
  
  return message;
}

代码示例来源:origin: eclipse/hono

final Map<Symbol, Object> annotations = new HashMap<>();
annotations.put(Symbol.valueOf(MessageHelper.ANNOTATION_X_OPT_APP_CORRELATION_ID), true);
message.setMessageAnnotations(new MessageAnnotations(annotations));

代码示例来源:origin: org.eclipse.hono/hono-core

final Map<Symbol, Object> annotations = new HashMap<>();
annotations.put(Symbol.valueOf(MessageHelper.ANNOTATION_X_OPT_APP_CORRELATION_ID), true);
message.setMessageAnnotations(new MessageAnnotations(annotations));

代码示例来源:origin: Azure/azure-event-hubs-java

: amqpMessage.getMessageAnnotations();
messageAnnotations.getValue().put(Symbol.getSymbol(systemProperty.getKey()), systemProperty.getValue());
amqpMessage.setMessageAnnotations(messageAnnotations);

代码示例来源:origin: Azure/azure-event-hubs-java

batchMessage.setMessageAnnotations(firstMessage.getMessageAnnotations());

代码示例来源:origin: Azure/azure-service-bus-java

batchMessage.setMessageAnnotations(firstMessage.getMessageAnnotations());

代码示例来源:origin: EnMasseProject/enmasse

/**
 * Return a raw AMQP message
 *
 * @return
 */
public Message toAmqp() {
  Message message = ProtonHelper.message();
  message.setSubject(AMQP_SUBJECT);
  Map<Symbol, Object> map = new HashMap<>();
  map.put(Symbol.valueOf(AMQP_RETAIN_ANNOTATION), this.isRetain);
  map.put(Symbol.valueOf(AMQP_QOS_ANNOTATION), this.qos.value());
  MessageAnnotations messageAnnotations = new MessageAnnotations(map);
  message.setMessageAnnotations(messageAnnotations);
  message.setAddress(this.topic);
  Header header = new Header();
  header.setDurable(this.qos != MqttQoS.AT_MOST_ONCE);
  message.setHeader(header);
  // the payload could be null (or empty)
  if (this.payload != null)
    message.setBody(new Data(new Binary(this.payload.getBytes())));
  return message;
}

代码示例来源:origin: EnMasseProject/enmasse

/**
 * Return a raw AMQP message
 *
 * @return
 */
public Message toAmqp() {
  Message message = ProtonHelper.message();
  message.setSubject(AMQP_SUBJECT);
  Map<Symbol, Object> map = new HashMap<>();
  map.put(Symbol.valueOf(AMQP_RETAIN_ANNOTATION), this.isRetain);
  map.put(Symbol.valueOf(AMQP_QOS_ANNOTATION), this.qos.value());
  MessageAnnotations messageAnnotations = new MessageAnnotations(map);
  message.setMessageAnnotations(messageAnnotations);
  message.setAddress(this.topic);
  Header header = new Header();
  header.setDurable(this.qos != MqttQoS.AT_MOST_ONCE);
  message.setHeader(header);
  // the payload could be null (or empty)
  if (this.payload != null)
    message.setBody(new Data(new Binary(this.payload.getBytes())));
  return message;
}

代码示例来源:origin: EnMasseProject/enmasse

/**
 * Return a raw AMQP message
 *
 * @return
 */
public Message toAmqp() {
  Message message = ProtonHelper.message();
  Map<Symbol, Object> map = new HashMap<>();
  map.put(Symbol.valueOf(AMQP_RETAIN_ANNOTATION), this.isRetain);
  map.put(Symbol.valueOf(AMQP_QOS_ANNOTATION), this.qos.value());
  MessageAnnotations messageAnnotations = new MessageAnnotations(map);
  message.setMessageAnnotations(messageAnnotations);
  message.setAddress(this.topic);
  Header header = new Header();
  header.setDurable(this.qos != MqttQoS.AT_MOST_ONCE);
  message.setHeader(header);
  message.setDeliveryCount(this.isDup ? 1 : 0);
  // the payload could be null (or empty)
  if (this.payload != null)
    message.setBody(new Data(new Binary(this.payload.getBytes())));
  return message;
}

代码示例来源:origin: EnMasseProject/enmasse

/**
 * Return a raw AMQP message
 *
 * @return
 */
public Message toAmqp() {
  Message message = ProtonHelper.message();
  message.setMessageId(this.messageId);
  Map<Symbol, Object> map = new HashMap<>();
  map.put(Symbol.valueOf(AMQP_RETAIN_ANNOTATION), this.isRetain);
  map.put(Symbol.valueOf(AMQP_QOS_ANNOTATION), this.qos.value());
  MessageAnnotations messageAnnotations = new MessageAnnotations(map);
  message.setMessageAnnotations(messageAnnotations);
  message.setAddress(this.topic);
  Header header = new Header();
  header.setDurable(this.qos != MqttQoS.AT_MOST_ONCE);
  message.setHeader(header);
  message.setDeliveryCount(this.isDup ? 1 : 0);
  // the payload could be null (or empty)
  if (this.payload != null)
    message.setBody(new Data(new Binary(this.payload.getBytes())));
  return message;
}

代码示例来源:origin: io.vertx/vertx-amqp-bridge

protonMsg.setMessageAnnotations(ma);

代码示例来源:origin: Azure/azure-service-bus-java

amqpMessage.setMessageAnnotations(new MessageAnnotations(messageAnnotationsMap));

代码示例来源:origin: io.vertx/vertx-amqp-bridge

@Test
public void testAMQP_to_JSON_VerifyMessageAnnotations() {
 Map<Symbol, Object> annotations = new HashMap<>();
 MessageAnnotations ma = new MessageAnnotations(annotations);
 String testAnnKeyNameA = "testAnnKeyA";
 String testAnnKeyNameB = "testAnnKeyB";
 Symbol testAnnKeyA = Symbol.valueOf(testAnnKeyNameA);
 String testAnnValueA = "testAnnValueA";
 Symbol testAnnKeyB = Symbol.valueOf(testAnnKeyNameB);
 String testAnnValueB = "testAnnValueB";
 annotations.put(testAnnKeyA, testAnnValueA);
 annotations.put(testAnnKeyB, testAnnValueB);
 Message protonMsg = Proton.message();
 protonMsg.setMessageAnnotations(ma);
 JsonObject jsonObject = translator.convertToJsonObject(protonMsg);
 assertNotNull("expected converted msg", jsonObject);
 assertTrue("expected message annotations element key to be present",
   jsonObject.containsKey(AmqpConstants.MESSAGE_ANNOTATIONS));
 JsonObject jsonMsgAnn = jsonObject.getJsonObject(AmqpConstants.MESSAGE_ANNOTATIONS);
 assertNotNull("expected message annotations element value to be non-null", jsonMsgAnn);
 assertTrue("expected key to be present", jsonMsgAnn.containsKey(testAnnKeyNameA));
 assertEquals("expected value to be equal", testAnnValueA, jsonMsgAnn.getValue(testAnnKeyNameA));
 assertTrue("expected key to be present", jsonMsgAnn.containsKey(testAnnKeyNameB));
 assertEquals("expected value to be equal", testAnnValueB, jsonMsgAnn.getValue(testAnnKeyNameB));
}

相关文章