org.apache.rocketmq.common.message.Message.getKeys()方法的使用及代码示例

x33g5p2x  于2022-01-25 转载在 其他  
字(5.6k)|赞(0)|评价(0)|浏览(205)

本文整理了Java中org.apache.rocketmq.common.message.Message.getKeys()方法的一些代码示例,展示了Message.getKeys()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Message.getKeys()方法的具体详情如下:
包路径:org.apache.rocketmq.common.message.Message
类名称:Message
方法名:getKeys

Message.getKeys介绍

暂无

代码示例

代码示例来源:origin: apache/storm

/**
   * Generate Storm tuple values by Message and Scheme.
   * @param msg RocketMQ Message
   * @param scheme Scheme for deserializing
   * @return tuple values
   */
  public static List<Object> generateTuples(Message msg, Scheme scheme) {
    List<Object> tup;
    String rawKey = msg.getKeys();
    ByteBuffer body = ByteBuffer.wrap(msg.getBody());
    if (rawKey != null && scheme instanceof KeyValueScheme) {
      ByteBuffer key = ByteBuffer.wrap(rawKey.getBytes(StandardCharsets.UTF_8));
      tup = ((KeyValueScheme)scheme).deserializeKeyAndValue(key, body);
    } else {
      tup = scheme.deserialize(body);
    }
    return tup;
  }
}

代码示例来源:origin: apache/rocketmq

@Override
public void sendMessageBefore(SendMessageContext context) {
  //if it is message trace data,then it doesn't recorded
  if (context == null || context.getMessage().getTopic().startsWith(((AsyncTraceDispatcher) localDispatcher).getTraceTopicName())) {
    return;
  }
  //build the context content of TuxeTraceContext
  TraceContext tuxeContext = new TraceContext();
  tuxeContext.setTraceBeans(new ArrayList<TraceBean>(1));
  context.setMqTraceContext(tuxeContext);
  tuxeContext.setTraceType(TraceType.Pub);
  tuxeContext.setGroupName(context.getProducerGroup());
  //build the data bean object of message trace
  TraceBean traceBean = new TraceBean();
  traceBean.setTopic(context.getMessage().getTopic());
  traceBean.setTags(context.getMessage().getTags());
  traceBean.setKeys(context.getMessage().getKeys());
  traceBean.setStoreHost(context.getBrokerAddr());
  traceBean.setBodyLength(context.getMessage().getBody().length);
  traceBean.setMsgType(context.getMsgType());
  tuxeContext.getTraceBeans().add(traceBean);
}

代码示例来源:origin: apache/rocketmq

Assert.assertEquals(message.getTopic(), messageByOffset.getTopic());
Assert.assertEquals(message.getKeys(), messageByOffset.getKeys());
Assert.assertEquals(message.getKeys(), messageByMsgId.getKeys());

代码示例来源:origin: javahongxi/whatsmars

private SendResult sendOrderly(Message message) throws Exception {
  SendResult sendResult = this.defaultMQProducer.send(message,
      new MessageQueueSelector() {
        @Override
        public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
          long id = NumberUtils.toLong(String.valueOf(arg));
          int index = (int) (id % mqs.size());
          return mqs.get(index);
        }
      }, message.getKeys());
  log.debug("send result: {}", sendResult);
  return sendResult;
}

代码示例来源:origin: javahongxi/whatsmars

private static SendResult sendOrderly(String producerGroup, Message message, int sendMsgTimeout) throws Exception {
  return getProducer(producerGroup, sendMsgTimeout).send(message,
      new MessageQueueSelector() {
        @Override
        public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
          long id = NumberUtils.toLong(String.valueOf(arg));
          int index = (int) (id % mqs.size());
          return mqs.get(index);
        }
      }, message.getKeys());
}

代码示例来源:origin: maihaoche/rocketmq-spring-boot-starter

@Override
  public void onException(Throwable e) {
    //todo 对于发送失败的数据,如何保存,保证所有轨迹数据都记录下来
    clientlog.info("send trace data failed ,the msgidSet is"+message.getKeys());
  }
}, 5000);

代码示例来源:origin: didi/DDMQ

Assert.assertEquals(message.getTopic(), messageByOffset.getTopic());
Assert.assertEquals(message.getKeys(), messageByOffset.getKeys());
Assert.assertEquals(message.getKeys(), messageByMsgId.getKeys());

代码示例来源:origin: maihaoche/rocketmq-spring-boot-starter

/**
 * 发送数据的接口
 *
 * @param keySet
 *            本批次包含的keyset
 * @param data
 *            本批次的轨迹数据
 */
public void sendTraceDataByMQ(Set<String> keySet, String data) {
  String topic = OnsTraceConstants.traceTopic;
  final Message message = new Message(topic, data.getBytes());
  message.setKeys(keySet);
  try {
    traceProducer.send(message, new SendCallback() {
      @Override
      public void onSuccess(SendResult sendResult) {
      }
      @Override
      public void onException(Throwable e) {
        //todo 对于发送失败的数据,如何保存,保证所有轨迹数据都记录下来
        clientlog.info("send trace data failed ,the msgidSet is"+message.getKeys());
      }
    }, 5000);
  }
  catch (Exception e) {
    clientlog.info("send trace data failed ,the msgidSet is"+message.getKeys());
  }
}

代码示例来源:origin: org.apache.rocketmq/rocketmq-client

@Override
public void sendMessageBefore(SendMessageContext context) {
  //if it is message trace data,then it doesn't recorded
  if (context == null || context.getMessage().getTopic().startsWith(((AsyncTraceDispatcher) localDispatcher).getTraceTopicName())) {
    return;
  }
  //build the context content of TuxeTraceContext
  TraceContext tuxeContext = new TraceContext();
  tuxeContext.setTraceBeans(new ArrayList<TraceBean>(1));
  context.setMqTraceContext(tuxeContext);
  tuxeContext.setTraceType(TraceType.Pub);
  tuxeContext.setGroupName(context.getProducerGroup());
  //build the data bean object of message trace
  TraceBean traceBean = new TraceBean();
  traceBean.setTopic(context.getMessage().getTopic());
  traceBean.setTags(context.getMessage().getTags());
  traceBean.setKeys(context.getMessage().getKeys());
  traceBean.setStoreHost(context.getBrokerAddr());
  traceBean.setBodyLength(context.getMessage().getBody().length);
  traceBean.setMsgType(context.getMsgType());
  tuxeContext.getTraceBeans().add(traceBean);
}

代码示例来源:origin: org.apache.rocketmq/rocketmq-spring-boot

public static org.springframework.messaging.Message convertToSpringMessage(
  org.apache.rocketmq.common.message.Message message) {
  org.springframework.messaging.Message retMessage =
    MessageBuilder.withPayload(message.getBody()).
      setHeader(RocketMQHeaders.KEYS, message.getKeys()).
      setHeader(RocketMQHeaders.TAGS, message.getTags()).
      setHeader(RocketMQHeaders.TOPIC, message.getTopic()).
      setHeader(RocketMQHeaders.FLAG, message.getFlag()).
      setHeader(RocketMQHeaders.TRANSACTION_ID, message.getTransactionId()).
      setHeader(RocketMQHeaders.PROPERTIES, message.getProperties()).
      build();
  return retMessage;
}

相关文章