本文整理了Java中com.alibaba.rocketmq.common.message.Message.getKeys()
方法的一些代码示例,展示了Message.getKeys()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Message.getKeys()
方法的具体详情如下:
包路径:com.alibaba.rocketmq.common.message.Message
类名称:Message
方法名:getKeys
暂无
代码示例来源:origin: coffeewar/enode-master
@Override
public void onException(Throwable e) {
//todo 对于发送失败的数据,如何保存,保证所有轨迹数据都记录下来
clientlog.info("send trace data failed ,the msgidSet is" + message.getKeys());
}
}, 5000);
代码示例来源:origin: coffeewar/enode-master
@Override
public void onSuccess(SendResult sendResult) {
logger.info("ENode message async send success, keys:{}, sendResult: {}, routingKey: {}, messageId: {}, version: {}", message.getKeys(), sendResult, routingKey, messageId, version);
promise.complete(AsyncTaskResult.Success);
}
代码示例来源:origin: coffeewar/enode-master
@Override
public void onException(Throwable ex) {
logger.error("ENode message async send failed, keys:{}, routingKey: {}, messageId: {}, version: {}", message.getKeys(), routingKey, messageId, version);
promise.complete(new AsyncTaskResult(AsyncTaskStatus.IOException, ex.getMessage()));
}
});
代码示例来源:origin: coffeewar/enode-master
public CompletableFuture<AsyncTaskResult> sendMessageAsync(Producer producer, Message message, String routingKey, String messageId, String version) {
CompletableFuture<AsyncTaskResult> promise = new CompletableFuture<>();
logger.info("============= send rocketmq message,keys:{},messageid: {},routingKey: {}", message.getKeys(), messageId, routingKey);
try {
producer.send(message, this::messageQueueSelect, routingKey, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
logger.info("ENode message async send success, keys:{}, sendResult: {}, routingKey: {}, messageId: {}, version: {}", message.getKeys(), sendResult, routingKey, messageId, version);
promise.complete(AsyncTaskResult.Success);
}
@Override
public void onException(Throwable ex) {
logger.error("ENode message async send failed, keys:{}, routingKey: {}, messageId: {}, version: {}", message.getKeys(), routingKey, messageId, version);
promise.complete(new AsyncTaskResult(AsyncTaskStatus.IOException, ex.getMessage()));
}
});
} catch (Exception ex) {
logger.error(String.format("ENode message async send has exception,keys:%s,message: %s, routingKey: %s, messageId: %s, version: %s", message.getKeys(), message, routingKey, messageId, version), ex);
promise.complete(new AsyncTaskResult(AsyncTaskStatus.IOException, ex.getMessage()));
}
return promise;
}
代码示例来源:origin: coffeewar/enode-master
/**
* 发送数据的接口
*
* @param keySet 本批次包含的keyset
* @param data 本批次的轨迹数据
*/
public void sendTraceDataByMQ(Set<String> keySet, String data) {
String topic = OnsTraceConstants.traceTopic + currentRegionId;
final Message message = new Message(topic, data.getBytes());
message.setKeys(keySet);
try {
traceProducer.send(message, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
}
@Override
public void onException(Throwable e) {
//todo 对于发送失败的数据,如何保存,保证所有轨迹数据都记录下来
clientlog.info("send trace data failed ,the msgidSet is" + message.getKeys());
}
}, 5000);
} catch (Exception e) {
clientlog.info("send trace data failed ,the msgidSet is" + message.getKeys());
}
}
代码示例来源:origin: beston123/Tarzan
LOGGER.error("准备消息 '{}' 失败, {}", message.getKeys(), result.getErrorMsg());
return;
LOGGER.info("准备消息 '" + message.getKeys() + "' 成功, 事务Id=" + tid);
LOGGER.info("本地事务处理成功,提交消息'" + message.getKeys() + "'.");
testMessageNotifier.commitMessage(tid, message);
}else {
LOGGER.info("本地事务处理失败,回滚消息'" + message.getKeys() + "'.");
testMessageNotifier.rollbackMessage(tid);
代码示例来源:origin: beston123/Tarzan
private RocketMQBody buildMQBody(Message message){
RocketMQBody mqBody = new RocketMQBody();
mqBody.setProducerGroup(getGroupId());
mqBody.setTopic(getTopic());
mqBody.setTags(message.getTags());
mqBody.setMessageKey(message.getKeys());
mqBody.setMessageBody(message.getBody());
return mqBody;
}
代码示例来源:origin: coffeewar/enode-master
@Override
public void sendMessageBefore(SendMessageContext context) {
// 如果是消息轨迹本身的发送链路,则不需要再记录
if (context == null || context.getMessage().getTopic().startsWith(MixAll.SYSTEM_TOPIC_PREFIX)) {
return;
}
OnsTraceContext onsContext = new OnsTraceContext();
onsContext.setTraceBeans(new ArrayList<OnsTraceBean>(1));
context.setMqTraceContext(onsContext);
onsContext.setTraceType(OnsTraceType.Pub);
onsContext.setGroupName(context.getProducerGroup());
OnsTraceBean traceBean = new OnsTraceBean();
traceBean.setTopic(context.getMessage().getTopic());
traceBean.setTags(context.getMessage().getTags());
traceBean.setKeys(context.getMessage().getKeys());
traceBean.setStoreHost(context.getBrokerAddr());
traceBean.setBodyLength(context.getMessage().getBody().length);
traceBean.setMsgType(context.getMsgType());
onsContext.getTraceBeans().add(traceBean);
}
内容来源于网络,如有侵权,请联系作者删除!