本文整理了Java中org.apache.rocketmq.common.message.Message.getBody()
方法的一些代码示例,展示了Message.getBody()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Message.getBody()
方法的具体详情如下:
包路径:org.apache.rocketmq.common.message.Message
类名称:Message
方法名:getBody
暂无
代码示例来源:origin: spring-cloud-incubator/spring-cloud-alibaba
@Override
public LocalTransactionState executeLocalTransactionBranch(Message msg, Object arg) {
if ("1".equals(msg.getUserProperty("test"))) {
System.out.println(new String(msg.getBody()) + " rollback");
return LocalTransactionState.ROLLBACK_MESSAGE;
}
System.out.println(new String(msg.getBody()) + " commit");
return LocalTransactionState.COMMIT_MESSAGE;
}
}
代码示例来源:origin: apache/rocketmq-externals
@Override
public void onException(Throwable e) {
latch.countDown();
errorNum.incrementAndGet();
try {
log.error("Message publish failed,body=" + new String(message.getBody(), "UTF-8"), e);
} catch (UnsupportedEncodingException e1) {
log.error("Encoding error", e);
}
}
}
代码示例来源:origin: apache/rocketmq-externals
@Override
public void onSuccess(SendResult sendResult) {
latch.countDown();
if (log.isDebugEnabled()) {
try {
log.debug("Sent event,body={},sendResult={}", new String(message.getBody(), "UTF-8"), sendResult);
} catch (UnsupportedEncodingException e) {
log.error("Encoding error", e);
}
}
}
代码示例来源:origin: apache/rocketmq
public void asyncSend(Object msg) {
Message metaqMsg = (Message) msg;
try {
producer.send(metaqMsg, sendCallback);
msgBodys.addData(new String(metaqMsg.getBody()));
originMsgs.addData(msg);
} catch (Exception e) {
e.printStackTrace();
}
}
代码示例来源:origin: apache/rocketmq
public void asyncSend(Object msg, MessageQueueSelector selector, Object arg) {
Message metaqMsg = (Message) msg;
try {
producer.send(metaqMsg, selector, arg, sendCallback);
msgBodys.addData(new String(metaqMsg.getBody()));
originMsgs.addData(msg);
} catch (Exception e) {
e.printStackTrace();
}
}
代码示例来源:origin: apache/rocketmq
public void asyncSend(Object msg, MessageQueue mq) {
Message metaqMsg = (Message) msg;
try {
producer.send(metaqMsg, mq, sendCallback);
msgBodys.addData(new String(metaqMsg.getBody()));
originMsgs.addData(msg);
} catch (Exception e) {
e.printStackTrace();
}
}
代码示例来源:origin: apache/rocketmq
public void sendOneWay(Object msg, MessageQueueSelector selector, Object arg) {
Message metaqMsg = (Message) msg;
try {
producer.sendOneway(metaqMsg, selector, arg);
msgBodys.addData(new String(metaqMsg.getBody()));
originMsgs.addData(msg);
} catch (Exception e) {
e.printStackTrace();
}
}
代码示例来源:origin: apache/rocketmq
public void sendOneWay(Object msg) {
Message metaqMsg = (Message) msg;
try {
producer.sendOneway(metaqMsg);
msgBodys.addData(new String(metaqMsg.getBody()));
originMsgs.addData(msg);
} catch (Exception e) {
e.printStackTrace();
}
}
代码示例来源:origin: apache/rocketmq
public void sendOneWay(Object msg, MessageQueue mq) {
Message metaqMsg = (Message) msg;
try {
producer.sendOneway(metaqMsg, mq);
msgBodys.addData(new String(metaqMsg.getBody()));
originMsgs.addData(msg);
} catch (Exception e) {
e.printStackTrace();
}
}
代码示例来源:origin: apache/rocketmq
for (; nextIndex < messages.size(); nextIndex++) {
Message message = messages.get(nextIndex);
int tmpSize = message.getTopic().length() + message.getBody().length;
Map<String, String> properties = message.getProperties();
for (Map.Entry<String, String> entry : properties.entrySet()) {
代码示例来源:origin: apache/rocketmq
public static byte[] encodeMessage(Message message) {
byte[] body = message.getBody();
int bodyLen = body.length;
String properties = messageProperties2String(message.getProperties());
代码示例来源:origin: apache/storm
/**
* Generate Storm tuple values by Message and Scheme.
* @param msg RocketMQ Message
* @param scheme Scheme for deserializing
* @return tuple values
*/
public static List<Object> generateTuples(Message msg, Scheme scheme) {
List<Object> tup;
String rawKey = msg.getKeys();
ByteBuffer body = ByteBuffer.wrap(msg.getBody());
if (rawKey != null && scheme instanceof KeyValueScheme) {
ByteBuffer key = ByteBuffer.wrap(rawKey.getBytes(StandardCharsets.UTF_8));
tup = ((KeyValueScheme)scheme).deserializeKeyAndValue(key, body);
} else {
tup = scheme.deserialize(body);
}
return tup;
}
}
代码示例来源:origin: apache/rocketmq
/**
* Validate message
*/
public static void checkMessage(Message msg, DefaultMQProducer defaultMQProducer)
throws MQClientException {
if (null == msg) {
throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL, "the message is null");
}
// topic
Validators.checkTopic(msg.getTopic());
// body
if (null == msg.getBody()) {
throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL, "the message body is null");
}
if (0 == msg.getBody().length) {
throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL, "the message body length is zero");
}
if (msg.getBody().length > defaultMQProducer.getMaxMessageSize()) {
throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL,
"the message body size over max value, MAX: " + defaultMQProducer.getMaxMessageSize());
}
}
代码示例来源:origin: apache/rocketmq
public ResultWrapper send(Object msg, Object orderKey) {
org.apache.rocketmq.client.producer.SendResult metaqResult = null;
Message message = (Message) msg;
try {
long start = System.currentTimeMillis();
metaqResult = producer.send(message);
this.msgRTs.addData(System.currentTimeMillis() - start);
if (isDebug) {
logger.info(metaqResult);
}
sendResult.setMsgId(metaqResult.getMsgId());
sendResult.setSendResult(metaqResult.getSendStatus().equals(SendStatus.SEND_OK));
sendResult.setBrokerIp(metaqResult.getMessageQueue().getBrokerName());
msgBodys.addData(new String(message.getBody()));
originMsgs.addData(msg);
originMsgIndex.put(new String(message.getBody()), metaqResult);
} catch (Exception e) {
if (isDebug) {
e.printStackTrace();
}
sendResult.setSendResult(false);
sendResult.setSendException(e);
errorMsgs.addData(msg);
}
return sendResult;
}
代码示例来源:origin: apache/rocketmq
public ResultWrapper sendMQ(Message msg, MessageQueue mq) {
org.apache.rocketmq.client.producer.SendResult metaqResult = null;
try {
long start = System.currentTimeMillis();
metaqResult = producer.send(msg, mq);
this.msgRTs.addData(System.currentTimeMillis() - start);
if (isDebug) {
logger.info(metaqResult);
}
sendResult.setMsgId(metaqResult.getMsgId());
sendResult.setSendResult(metaqResult.getSendStatus().equals(SendStatus.SEND_OK));
sendResult.setBrokerIp(metaqResult.getMessageQueue().getBrokerName());
msgBodys.addData(new String(msg.getBody()));
originMsgs.addData(msg);
originMsgIndex.put(new String(msg.getBody()), metaqResult);
} catch (Exception e) {
if (isDebug) {
e.printStackTrace();
}
sendResult.setSendResult(false);
sendResult.setSendException(e);
errorMsgs.addData(msg);
}
return sendResult;
}
代码示例来源:origin: apache/rocketmq
public static Message cloneMessage(final Message msg) {
Message newMsg = new Message(msg.getTopic(), msg.getBody());
newMsg.setFlag(msg.getFlag());
newMsg.setProperties(msg.getProperties());
return newMsg;
}
代码示例来源:origin: apache/rocketmq
request.setBody(msg.getBody());
代码示例来源:origin: apache/rocketmq
private boolean tryToCompressMessage(final Message msg) {
if (msg instanceof MessageBatch) {
//batch dose not support compressing right now
return false;
}
byte[] body = msg.getBody();
if (body != null) {
if (body.length >= this.defaultMQProducer.getCompressMsgBodyOverHowmuch()) {
try {
byte[] data = UtilAll.compress(body, zipCompressLevel);
if (data != null) {
msg.setBody(data);
return true;
}
} catch (IOException e) {
log.error("tryToCompressMessage exception", e);
log.warn(msg.toString());
}
}
}
return false;
}
代码示例来源:origin: apache/rocketmq
@Override
public void sendMessageBefore(SendMessageContext context) {
//if it is message trace data,then it doesn't recorded
if (context == null || context.getMessage().getTopic().startsWith(((AsyncTraceDispatcher) localDispatcher).getTraceTopicName())) {
return;
}
//build the context content of TuxeTraceContext
TraceContext tuxeContext = new TraceContext();
tuxeContext.setTraceBeans(new ArrayList<TraceBean>(1));
context.setMqTraceContext(tuxeContext);
tuxeContext.setTraceType(TraceType.Pub);
tuxeContext.setGroupName(context.getProducerGroup());
//build the data bean object of message trace
TraceBean traceBean = new TraceBean();
traceBean.setTopic(context.getMessage().getTopic());
traceBean.setTags(context.getMessage().getTags());
traceBean.setKeys(context.getMessage().getKeys());
traceBean.setStoreHost(context.getBrokerAddr());
traceBean.setBodyLength(context.getMessage().getBody().length);
traceBean.setMsgType(context.getMsgType());
tuxeContext.getTraceBeans().add(traceBean);
}
代码示例来源:origin: apache/rocketmq
private MessageExtBrokerInner makeOpMessageInner(Message message, MessageQueue messageQueue) {
MessageExtBrokerInner msgInner = new MessageExtBrokerInner();
msgInner.setTopic(message.getTopic());
msgInner.setBody(message.getBody());
msgInner.setQueueId(messageQueue.getQueueId());
msgInner.setTags(message.getTags());
msgInner.setTagsCode(MessageExtBrokerInner.tagsString2tagsCode(msgInner.getTags()));
msgInner.setSysFlag(0);
MessageAccessor.setProperties(msgInner, message.getProperties());
msgInner.setPropertiesString(MessageDecoder.messageProperties2String(message.getProperties()));
msgInner.setBornTimestamp(System.currentTimeMillis());
msgInner.setBornHost(this.storeHost);
msgInner.setStoreHost(this.storeHost);
msgInner.setWaitStoreMsgOK(false);
MessageClientIDSetter.setUniqID(msgInner);
return msgInner;
}
内容来源于网络,如有侵权,请联系作者删除!