com.alibaba.rocketmq.common.message.Message.setTopic()方法的使用及代码示例

x33g5p2x  于2022-01-25 转载在 其他  
字(4.1k)|赞(0)|评价(0)|浏览(422)

本文整理了Java中com.alibaba.rocketmq.common.message.Message.setTopic()方法的一些代码示例,展示了Message.setTopic()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Message.setTopic()方法的具体详情如下:
包路径:com.alibaba.rocketmq.common.message.Message
类名称:Message
方法名:setTopic

Message.setTopic介绍

暂无

代码示例

代码示例来源:origin: kuangye098/rocketmq

private static Message buildMessage(final int messageSize) throws UnsupportedEncodingException {
    Message msg = new Message();
    msg.setTopic("BenchmarkTest");

    StringBuilder sb = new StringBuilder();
    for (int i = 0; i < messageSize; i += 10) {
      sb.append("hello baby");
    }

    msg.setBody(sb.toString().getBytes(RemotingHelper.DEFAULT_CHARSET));

    return msg;
  }
}

代码示例来源:origin: kuangye098/rocketmq

private static Message buildMessage(final String topic, final int messageSize) throws UnsupportedEncodingException {
    Message msg = new Message();
    msg.setTopic(topic);

    StringBuilder sb = new StringBuilder();
    for (int i = 0; i < messageSize; i += 11) {
      sb.append("hello jodie");
    }
    msg.setBody(sb.toString().getBytes(MixAll.DEFAULT_CHARSET));
    return msg;
  }
}

代码示例来源:origin: kuangye098/rocketmq

private static Message buildMessage(final int messageSize) throws UnsupportedEncodingException {
    Message msg = new Message();
    msg.setTopic("BenchmarkTest");

    StringBuilder sb = new StringBuilder();
    for (int i = 0; i < messageSize; i += 10) {
      sb.append("hello baby");
    }

    msg.setBody(sb.toString().getBytes(RemotingHelper.DEFAULT_CHARSET));

    return msg;
  }
}

代码示例来源:origin: beston123/Tarzan

private void checkMessage(Message message) throws MQClientException {
  if(message.getTopic() == null){
    message.setTopic(getTopic());
  }
  RocketMQValidators.checkMessage(message);
}

代码示例来源:origin: songxinjianqwe/EShop-SOA

@Transactional
@Override
public void check() {
  List<Long> all = mapper.findMessageIdsByStatusCreatedAfter(Arrays.asList(MessageStatus.UNCONSUMED, MessageStatus.CONSUME_FAILED), MQProducerConfig.CHECK_GAP);
  Message checkMessage = new Message();
  checkMessage.setTopic(config.getTopic());
  checkMessage.setTags(config.getCheckKeys());
  checkMessage.setBody(ProtoStuffUtil.serialize(all));
  try {
    producer.send(checkMessage);
  } catch (Exception e) {
    log.info("发送check消息失败,暂不做处理,不会影响数据一致性");
    e.printStackTrace();
  }
}

代码示例来源:origin: songxinjianqwe/EShop-SOA

@Override
public void commit(OrderDO order, String paymentPassword) {
  Message message = new Message();
  message.setTopic(config.getTopic());
  message.setBody(ProtoStuffUtil.serialize(order));
  TransactionSendResult result = null;
  try {
    result = this.producer.sendMessageInTransaction(message, executor, paymentPassword);
    log.info("事务消息发送结果:{}", result);
    log.info("TransactionState:{} ", result.getLocalTransactionState());
    // 因为无法获得executor中抛出的异常,只能模糊地返回订单支付失败信息。
    // TODO 想办法从executor中找到原生异常
  } catch (Exception e) {
    log.info("AccountService抛出异常...");
    e.printStackTrace();
  }
  if (result.getLocalTransactionState() == LocalTransactionState.ROLLBACK_MESSAGE) {
    throw new OrderPaymentException(order.getId());
  }
}

代码示例来源:origin: songxinjianqwe/EShop-SOA

@Transactional
@Override
public void reSend(List<ProducerTransactionMessageDO> messages) {
  for (ProducerTransactionMessageDO messageDO : messages) {
    if (messageDO.getSendTimes() == config.getRetryTimes()) {
      messageDO.setUpdateTime(LocalDateTime.now());
      messageDO.setMessageStatus(MessageStatus.OVER_CONFIRM_RETRY_TIME);
      mapper.updateByPrimaryKeySelective(messageDO);
      continue;
    }
    Message message = new Message();
    message.setTopic(config.getTopic());
    message.setBody(messageDO.getBody());
    try {
      SendResult result = producer.send(message);
      messageDO.setSendTimes(messageDO.getSendTimes() + 1);
      messageDO.setUpdateTime(LocalDateTime.now());
      mapper.updateByPrimaryKeySelective(messageDO);
      log.info("发送重试消息完毕,Message:{},result:{}", message, result);
    } catch (Exception e) {
      e.printStackTrace();
      log.info("发送重试消息时失败! Message:{}", message);
    }
  }
}

代码示例来源:origin: beston123/Tarzan

message.setTopic(Constants.TARZAN_TEST_TOPIC);
message.setTags(TestConstants.MESSAGE_TAG);
message.setKeys(messageKey);

代码示例来源:origin: songxinjianqwe/EShop-SOA

Map<Long, MessageStatus> result = messageService.findConsumerMessageStatuses(ids);
Message checkReply = new Message();
checkReply.setTopic(producerConfig.getTopic());
checkReply.setBody(ProtoStuffUtil.serialize(result));
producer.send(checkReply);

相关文章