org.apache.rocketmq.common.message.Message.getBody()方法的使用及代码示例

x33g5p2x  于2022-01-25 转载在 其他  
字(8.9k)|赞(0)|评价(0)|浏览(246)

本文整理了Java中org.apache.rocketmq.common.message.Message.getBody()方法的一些代码示例,展示了Message.getBody()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Message.getBody()方法的具体详情如下:
包路径:org.apache.rocketmq.common.message.Message
类名称:Message
方法名:getBody

Message.getBody介绍

暂无

代码示例

代码示例来源:origin: spring-cloud-incubator/spring-cloud-alibaba

@Override
  public LocalTransactionState executeLocalTransactionBranch(Message msg, Object arg) {
    if ("1".equals(msg.getUserProperty("test"))) {
      System.out.println(new String(msg.getBody()) + " rollback");
      return LocalTransactionState.ROLLBACK_MESSAGE;
    }
    System.out.println(new String(msg.getBody()) + " commit");
    return LocalTransactionState.COMMIT_MESSAGE;
  }
}

代码示例来源:origin: apache/rocketmq-externals

@Override
  public void onException(Throwable e) {
    latch.countDown();
    errorNum.incrementAndGet();
    try {
      log.error("Message publish failed,body=" + new String(message.getBody(), "UTF-8"), e);
    } catch (UnsupportedEncodingException e1) {
      log.error("Encoding error", e);
    }
  }
}

代码示例来源:origin: apache/rocketmq-externals

@Override
public void onSuccess(SendResult sendResult) {
  latch.countDown();
  if (log.isDebugEnabled()) {
    try {
      log.debug("Sent event,body={},sendResult={}", new String(message.getBody(), "UTF-8"), sendResult);
    } catch (UnsupportedEncodingException e) {
      log.error("Encoding error", e);
    }
  }
}

代码示例来源:origin: apache/rocketmq

public void asyncSend(Object msg) {
  Message metaqMsg = (Message) msg;
  try {
    producer.send(metaqMsg, sendCallback);
    msgBodys.addData(new String(metaqMsg.getBody()));
    originMsgs.addData(msg);
  } catch (Exception e) {
    e.printStackTrace();
  }
}

代码示例来源:origin: apache/rocketmq

public void asyncSend(Object msg, MessageQueueSelector selector, Object arg) {
  Message metaqMsg = (Message) msg;
  try {
    producer.send(metaqMsg, selector, arg, sendCallback);
    msgBodys.addData(new String(metaqMsg.getBody()));
    originMsgs.addData(msg);
  } catch (Exception e) {
    e.printStackTrace();
  }
}

代码示例来源:origin: apache/rocketmq

public void asyncSend(Object msg, MessageQueue mq) {
  Message metaqMsg = (Message) msg;
  try {
    producer.send(metaqMsg, mq, sendCallback);
    msgBodys.addData(new String(metaqMsg.getBody()));
    originMsgs.addData(msg);
  } catch (Exception e) {
    e.printStackTrace();
  }
}

代码示例来源:origin: apache/rocketmq

public void sendOneWay(Object msg, MessageQueueSelector selector, Object arg) {
  Message metaqMsg = (Message) msg;
  try {
    producer.sendOneway(metaqMsg, selector, arg);
    msgBodys.addData(new String(metaqMsg.getBody()));
    originMsgs.addData(msg);
  } catch (Exception e) {
    e.printStackTrace();
  }
}

代码示例来源:origin: apache/rocketmq

public void sendOneWay(Object msg) {
  Message metaqMsg = (Message) msg;
  try {
    producer.sendOneway(metaqMsg);
    msgBodys.addData(new String(metaqMsg.getBody()));
    originMsgs.addData(msg);
  } catch (Exception e) {
    e.printStackTrace();
  }
}

代码示例来源:origin: apache/rocketmq

public void sendOneWay(Object msg, MessageQueue mq) {
  Message metaqMsg = (Message) msg;
  try {
    producer.sendOneway(metaqMsg, mq);
    msgBodys.addData(new String(metaqMsg.getBody()));
    originMsgs.addData(msg);
  } catch (Exception e) {
    e.printStackTrace();
  }
}

代码示例来源:origin: apache/rocketmq

for (; nextIndex < messages.size(); nextIndex++) {
  Message message = messages.get(nextIndex);
  int tmpSize = message.getTopic().length() + message.getBody().length;
  Map<String, String> properties = message.getProperties();
  for (Map.Entry<String, String> entry : properties.entrySet()) {

代码示例来源:origin: apache/rocketmq

public static byte[] encodeMessage(Message message) {
  byte[] body = message.getBody();
  int bodyLen = body.length;
  String properties = messageProperties2String(message.getProperties());

代码示例来源:origin: apache/storm

/**
   * Generate Storm tuple values by Message and Scheme.
   * @param msg RocketMQ Message
   * @param scheme Scheme for deserializing
   * @return tuple values
   */
  public static List<Object> generateTuples(Message msg, Scheme scheme) {
    List<Object> tup;
    String rawKey = msg.getKeys();
    ByteBuffer body = ByteBuffer.wrap(msg.getBody());
    if (rawKey != null && scheme instanceof KeyValueScheme) {
      ByteBuffer key = ByteBuffer.wrap(rawKey.getBytes(StandardCharsets.UTF_8));
      tup = ((KeyValueScheme)scheme).deserializeKeyAndValue(key, body);
    } else {
      tup = scheme.deserialize(body);
    }
    return tup;
  }
}

代码示例来源:origin: apache/rocketmq

/**
 * Validate message
 */
public static void checkMessage(Message msg, DefaultMQProducer defaultMQProducer)
  throws MQClientException {
  if (null == msg) {
    throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL, "the message is null");
  }
  // topic
  Validators.checkTopic(msg.getTopic());
  // body
  if (null == msg.getBody()) {
    throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL, "the message body is null");
  }
  if (0 == msg.getBody().length) {
    throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL, "the message body length is zero");
  }
  if (msg.getBody().length > defaultMQProducer.getMaxMessageSize()) {
    throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL,
      "the message body size over max value, MAX: " + defaultMQProducer.getMaxMessageSize());
  }
}

代码示例来源:origin: apache/rocketmq

public ResultWrapper send(Object msg, Object orderKey) {
  org.apache.rocketmq.client.producer.SendResult metaqResult = null;
  Message message = (Message) msg;
  try {
    long start = System.currentTimeMillis();
    metaqResult = producer.send(message);
    this.msgRTs.addData(System.currentTimeMillis() - start);
    if (isDebug) {
      logger.info(metaqResult);
    }
    sendResult.setMsgId(metaqResult.getMsgId());
    sendResult.setSendResult(metaqResult.getSendStatus().equals(SendStatus.SEND_OK));
    sendResult.setBrokerIp(metaqResult.getMessageQueue().getBrokerName());
    msgBodys.addData(new String(message.getBody()));
    originMsgs.addData(msg);
    originMsgIndex.put(new String(message.getBody()), metaqResult);
  } catch (Exception e) {
    if (isDebug) {
      e.printStackTrace();
    }
    sendResult.setSendResult(false);
    sendResult.setSendException(e);
    errorMsgs.addData(msg);
  }
  return sendResult;
}

代码示例来源:origin: apache/rocketmq

public ResultWrapper sendMQ(Message msg, MessageQueue mq) {
  org.apache.rocketmq.client.producer.SendResult metaqResult = null;
  try {
    long start = System.currentTimeMillis();
    metaqResult = producer.send(msg, mq);
    this.msgRTs.addData(System.currentTimeMillis() - start);
    if (isDebug) {
      logger.info(metaqResult);
    }
    sendResult.setMsgId(metaqResult.getMsgId());
    sendResult.setSendResult(metaqResult.getSendStatus().equals(SendStatus.SEND_OK));
    sendResult.setBrokerIp(metaqResult.getMessageQueue().getBrokerName());
    msgBodys.addData(new String(msg.getBody()));
    originMsgs.addData(msg);
    originMsgIndex.put(new String(msg.getBody()), metaqResult);
  } catch (Exception e) {
    if (isDebug) {
      e.printStackTrace();
    }
    sendResult.setSendResult(false);
    sendResult.setSendException(e);
    errorMsgs.addData(msg);
  }
  return sendResult;
}

代码示例来源:origin: apache/rocketmq

public static Message cloneMessage(final Message msg) {
  Message newMsg = new Message(msg.getTopic(), msg.getBody());
  newMsg.setFlag(msg.getFlag());
  newMsg.setProperties(msg.getProperties());
  return newMsg;
}

代码示例来源:origin: apache/rocketmq

request.setBody(msg.getBody());

代码示例来源:origin: apache/rocketmq

private boolean tryToCompressMessage(final Message msg) {
  if (msg instanceof MessageBatch) {
    //batch dose not support compressing right now
    return false;
  }
  byte[] body = msg.getBody();
  if (body != null) {
    if (body.length >= this.defaultMQProducer.getCompressMsgBodyOverHowmuch()) {
      try {
        byte[] data = UtilAll.compress(body, zipCompressLevel);
        if (data != null) {
          msg.setBody(data);
          return true;
        }
      } catch (IOException e) {
        log.error("tryToCompressMessage exception", e);
        log.warn(msg.toString());
      }
    }
  }
  return false;
}

代码示例来源:origin: apache/rocketmq

@Override
public void sendMessageBefore(SendMessageContext context) {
  //if it is message trace data,then it doesn't recorded
  if (context == null || context.getMessage().getTopic().startsWith(((AsyncTraceDispatcher) localDispatcher).getTraceTopicName())) {
    return;
  }
  //build the context content of TuxeTraceContext
  TraceContext tuxeContext = new TraceContext();
  tuxeContext.setTraceBeans(new ArrayList<TraceBean>(1));
  context.setMqTraceContext(tuxeContext);
  tuxeContext.setTraceType(TraceType.Pub);
  tuxeContext.setGroupName(context.getProducerGroup());
  //build the data bean object of message trace
  TraceBean traceBean = new TraceBean();
  traceBean.setTopic(context.getMessage().getTopic());
  traceBean.setTags(context.getMessage().getTags());
  traceBean.setKeys(context.getMessage().getKeys());
  traceBean.setStoreHost(context.getBrokerAddr());
  traceBean.setBodyLength(context.getMessage().getBody().length);
  traceBean.setMsgType(context.getMsgType());
  tuxeContext.getTraceBeans().add(traceBean);
}

代码示例来源:origin: apache/rocketmq

private MessageExtBrokerInner makeOpMessageInner(Message message, MessageQueue messageQueue) {
  MessageExtBrokerInner msgInner = new MessageExtBrokerInner();
  msgInner.setTopic(message.getTopic());
  msgInner.setBody(message.getBody());
  msgInner.setQueueId(messageQueue.getQueueId());
  msgInner.setTags(message.getTags());
  msgInner.setTagsCode(MessageExtBrokerInner.tagsString2tagsCode(msgInner.getTags()));
  msgInner.setSysFlag(0);
  MessageAccessor.setProperties(msgInner, message.getProperties());
  msgInner.setPropertiesString(MessageDecoder.messageProperties2String(message.getProperties()));
  msgInner.setBornTimestamp(System.currentTimeMillis());
  msgInner.setBornHost(this.storeHost);
  msgInner.setStoreHost(this.storeHost);
  msgInner.setWaitStoreMsgOK(false);
  MessageClientIDSetter.setUniqID(msgInner);
  return msgInner;
}

相关文章