本文整理了Java中org.apache.rocketmq.common.message.Message.getKeys()
方法的一些代码示例,展示了Message.getKeys()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Message.getKeys()
方法的具体详情如下:
包路径:org.apache.rocketmq.common.message.Message
类名称:Message
方法名:getKeys
暂无
代码示例来源:origin: apache/storm
/**
* Generate Storm tuple values by Message and Scheme.
* @param msg RocketMQ Message
* @param scheme Scheme for deserializing
* @return tuple values
*/
public static List<Object> generateTuples(Message msg, Scheme scheme) {
List<Object> tup;
String rawKey = msg.getKeys();
ByteBuffer body = ByteBuffer.wrap(msg.getBody());
if (rawKey != null && scheme instanceof KeyValueScheme) {
ByteBuffer key = ByteBuffer.wrap(rawKey.getBytes(StandardCharsets.UTF_8));
tup = ((KeyValueScheme)scheme).deserializeKeyAndValue(key, body);
} else {
tup = scheme.deserialize(body);
}
return tup;
}
}
代码示例来源:origin: apache/rocketmq
@Override
public void sendMessageBefore(SendMessageContext context) {
//if it is message trace data,then it doesn't recorded
if (context == null || context.getMessage().getTopic().startsWith(((AsyncTraceDispatcher) localDispatcher).getTraceTopicName())) {
return;
}
//build the context content of TuxeTraceContext
TraceContext tuxeContext = new TraceContext();
tuxeContext.setTraceBeans(new ArrayList<TraceBean>(1));
context.setMqTraceContext(tuxeContext);
tuxeContext.setTraceType(TraceType.Pub);
tuxeContext.setGroupName(context.getProducerGroup());
//build the data bean object of message trace
TraceBean traceBean = new TraceBean();
traceBean.setTopic(context.getMessage().getTopic());
traceBean.setTags(context.getMessage().getTags());
traceBean.setKeys(context.getMessage().getKeys());
traceBean.setStoreHost(context.getBrokerAddr());
traceBean.setBodyLength(context.getMessage().getBody().length);
traceBean.setMsgType(context.getMsgType());
tuxeContext.getTraceBeans().add(traceBean);
}
代码示例来源:origin: apache/rocketmq
Assert.assertEquals(message.getTopic(), messageByOffset.getTopic());
Assert.assertEquals(message.getKeys(), messageByOffset.getKeys());
Assert.assertEquals(message.getKeys(), messageByMsgId.getKeys());
代码示例来源:origin: javahongxi/whatsmars
private SendResult sendOrderly(Message message) throws Exception {
SendResult sendResult = this.defaultMQProducer.send(message,
new MessageQueueSelector() {
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
long id = NumberUtils.toLong(String.valueOf(arg));
int index = (int) (id % mqs.size());
return mqs.get(index);
}
}, message.getKeys());
log.debug("send result: {}", sendResult);
return sendResult;
}
代码示例来源:origin: javahongxi/whatsmars
private static SendResult sendOrderly(String producerGroup, Message message, int sendMsgTimeout) throws Exception {
return getProducer(producerGroup, sendMsgTimeout).send(message,
new MessageQueueSelector() {
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
long id = NumberUtils.toLong(String.valueOf(arg));
int index = (int) (id % mqs.size());
return mqs.get(index);
}
}, message.getKeys());
}
代码示例来源:origin: maihaoche/rocketmq-spring-boot-starter
@Override
public void onException(Throwable e) {
//todo 对于发送失败的数据,如何保存,保证所有轨迹数据都记录下来
clientlog.info("send trace data failed ,the msgidSet is"+message.getKeys());
}
}, 5000);
代码示例来源:origin: didi/DDMQ
Assert.assertEquals(message.getTopic(), messageByOffset.getTopic());
Assert.assertEquals(message.getKeys(), messageByOffset.getKeys());
Assert.assertEquals(message.getKeys(), messageByMsgId.getKeys());
代码示例来源:origin: maihaoche/rocketmq-spring-boot-starter
/**
* 发送数据的接口
*
* @param keySet
* 本批次包含的keyset
* @param data
* 本批次的轨迹数据
*/
public void sendTraceDataByMQ(Set<String> keySet, String data) {
String topic = OnsTraceConstants.traceTopic;
final Message message = new Message(topic, data.getBytes());
message.setKeys(keySet);
try {
traceProducer.send(message, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
}
@Override
public void onException(Throwable e) {
//todo 对于发送失败的数据,如何保存,保证所有轨迹数据都记录下来
clientlog.info("send trace data failed ,the msgidSet is"+message.getKeys());
}
}, 5000);
}
catch (Exception e) {
clientlog.info("send trace data failed ,the msgidSet is"+message.getKeys());
}
}
代码示例来源:origin: org.apache.rocketmq/rocketmq-client
@Override
public void sendMessageBefore(SendMessageContext context) {
//if it is message trace data,then it doesn't recorded
if (context == null || context.getMessage().getTopic().startsWith(((AsyncTraceDispatcher) localDispatcher).getTraceTopicName())) {
return;
}
//build the context content of TuxeTraceContext
TraceContext tuxeContext = new TraceContext();
tuxeContext.setTraceBeans(new ArrayList<TraceBean>(1));
context.setMqTraceContext(tuxeContext);
tuxeContext.setTraceType(TraceType.Pub);
tuxeContext.setGroupName(context.getProducerGroup());
//build the data bean object of message trace
TraceBean traceBean = new TraceBean();
traceBean.setTopic(context.getMessage().getTopic());
traceBean.setTags(context.getMessage().getTags());
traceBean.setKeys(context.getMessage().getKeys());
traceBean.setStoreHost(context.getBrokerAddr());
traceBean.setBodyLength(context.getMessage().getBody().length);
traceBean.setMsgType(context.getMsgType());
tuxeContext.getTraceBeans().add(traceBean);
}
代码示例来源:origin: org.apache.rocketmq/rocketmq-spring-boot
public static org.springframework.messaging.Message convertToSpringMessage(
org.apache.rocketmq.common.message.Message message) {
org.springframework.messaging.Message retMessage =
MessageBuilder.withPayload(message.getBody()).
setHeader(RocketMQHeaders.KEYS, message.getKeys()).
setHeader(RocketMQHeaders.TAGS, message.getTags()).
setHeader(RocketMQHeaders.TOPIC, message.getTopic()).
setHeader(RocketMQHeaders.FLAG, message.getFlag()).
setHeader(RocketMQHeaders.TRANSACTION_ID, message.getTransactionId()).
setHeader(RocketMQHeaders.PROPERTIES, message.getProperties()).
build();
return retMessage;
}
内容来源于网络,如有侵权,请联系作者删除!