我正在编写一个消费者来读取来自mq的分段消息。我对其他队列使用了springjms/spring集成。我知道ibmq不支持jms中的消息分段:(这里是相关问题)。如何在spring集成中组装mq消息段)
下面是我为java和spring提供的使用IBMMQ类的方法。
mq对象的bean定义。
@Bean
public MQGetMessageOptions mqGetMessageOptions() {
MQGetMessageOptions getOptions = new MQGetMessageOptions();
getOptions.waitInterval = CMQC.MQWI_UNLIMITED;
getOptions.options = CMQC.MQGMO_WAIT + CMQC.MQGMO_ALL_SEGMENTS_AVAILABLE + CMQC.MQGMO_LOGICAL_ORDER
+ CMQC.MQGMO_COMPLETE_MSG;
return getOptions;
}
@Bean
public MQQueueManager mqQueueManager() throws Exception {
Hashtable<String, Object> properties = new Hashtable<String, Object>();
properties.put(CMQC.CHANNEL_PROPERTY, channel);
properties.put(CMQC.HOST_NAME_PROPERTY, hostName);
properties.put(CMQC.PORT_PROPERTY, new Integer(port));
MQQueueManager qMgr = new MQQueueManager(queueManager, properties);
return qMgr;
}
@Bean
public MQQueue inboundQueue(@Autowired MQQueueManager mqQueueManager) throws Exception {
int openOptions = CMQC.MQOO_INPUT_EXCLUSIVE;
MQQueue inboundQueue = mqQueueManager.accessQueue(inboundQueue, openOptions);
return inboundQueue;
}
@Bean
public MessageChannel queueConsumerChannel() {
// return new DirectChannel();
return new ExecutorChannel(Executors.newFixedThreadPool(5));
}
消费者代码:
@Component
@Slf4j
public class MyQueueConsumer {
@Autowired
MQQueueManager qMgr;
@Autowired
MQGetMessageOptions mqGetMessageOptions;
@Autowired
MQQueue inboundQueue;
@Autowired
MessageChannel queueConsumerChannel;
@Autowired
MessageSaveService messageSaveService;
@EventListener(ApplicationReadyEvent.class)
public void consume() {
boolean getMore = true;
MQMessage receiveMsg = null;
while (getMore) {
try {
receiveMsg = new MQMessage();
log.info("Waiting to consume mesages from ....");
inboundQueue.get(receiveMsg, mqGetMessageOptions);
byte[] b = new byte[receiveMsg.getMessageLength()];
receiveMsg.readFully(b);
String fileName = getFileName();
Message<String> outMessage = MessageBuilder.withPayload(new String(b)).build();
queueConsumerChannel.send(outMessage);
log.info("Message consumed and sent to processng channel");
// qMgr.commit();
} catch (MQException e) {
if ((e.completionCode == CMQC.MQCC_WARNING) && (e.reasonCode == CMQC.MQRC_NO_MSG_AVAILABLE)) {
log.error("Bottom of the queue reached.");
getMore = false;
} else {
log.error("MQRead CC=" + e.completionCode + " : RC=" + e.reasonCode + " : EC=" + e.getErrorCode());
log.info("Is Connected :" + qMgr.isConnected());
log.info("Is open : " + qMgr.isOpen());
e.printStackTrace();
getMore = false;
}
}
}
}
@PreDestroy
public void closeMQObjects() {
System.out.println("Closing MQ objects ");
try {
if (inboundQueue != null)
inboundQueue.close();
} catch (MQException e) {
System.err.println("MQRead CC=" + e.completionCode + " : RC=" + e.reasonCode);
e.printStackTrace();
}
try {
if (qMgr != null)
qMgr.disconnect();
} catch (MQException e) {
System.err.println("MQRead CC=" + e.completionCode + " : RC=" + e.reasonCode);
e.printStackTrace();
}
}
}
通过此配置,使用者可以按需工作,它将所有分段的消息组合起来,作为一条完整的消息读取,并在队列中等待下一条消息到达。但我面临的挑战是我明白了 com.ibm.mq.MQException: MQJE001: Completion Code '2', Reason '2009'
每隔一段时间。我把旗子弄错了,让它从while循环中出来。到目前为止,我们还不能找出这个例外的确切原因。如何从该异常中恢复并继续等待队列并在消息到达时使用它们?我使用spring和ibmq的方法有什么缺陷吗
1条答案
按热度按时间puruo6ea1#
而不是定义
inboundQueue
作为bean,在while循环中分配它并close
当您成功地结束循环或遇到可重试的异常时,它会自动关闭。很遗憾,我不能给你一个可重试例外的清单,但是
2009
绝对是其中之一。延迟重试是一种很好的做法,例如,请参见“指数后退”。