我想从ibmq流式传输数据。我已经试用了在github上找到的代码。我可以从队列流式传输数据,但每次流式传输时,它都会从队列中获取所有数据。我只想获取推送到队列中的当前数据。我查了很多网站,但没有找到正确的解决办法。在Kafka,我们有类似 KafkaStreamUtils 用于流式传输近实时数据。是否有类似于ibmq的内容,以便只流式传输最新的数据?
KafkaStreamUtils
wwwo4jvm1#
您提供的链接中的示例显示,它调用以下方法从ibm mq接收:
CustomMQReciever(String host , int port, String qm, String channel, String qn)
如果您在这里查看custommqreciever,您可以看到它只是浏览队列中的消息。这意味着消息仍将在队列中,下次连接时将收到相同的消息:
MQQueueBrowser browser = (MQQueueBrowser) qSession.createBrowser(queue);
如果要从队列中删除消息,则需要调用从队列中使用消息的方法,而不是从队列中浏览消息。下面是更改的示例 CustomMQReciever.java 这应该能达到你的目的:在 initConnection() 将上述代码更改为以下代码,以使其从队列中删除消息:
CustomMQReciever.java
initConnection()
MQMessageConsumer consumer = (MQMessageConsumer) qSession.createConsumer(queue);
摆脱:
enumeration= browser.getEnumeration();
低于 receive() 更改以下内容:
receive()
while (!isStopped() && enumeration.hasMoreElements() ) { receivedMessage= (JMSMessage) enumeration.nextElement(); String userInput = convertStreamToString(receivedMessage); //System.out.println("Received data :'" + userInput + "'"); store(userInput); }
对这样的事情:
while (!isStopped() && (receivedMessage = consumer.receiveNoWait()) != null)) { String userInput = convertStreamToString(receivedMessage); //System.out.println("Received data :'" + userInput + "'"); store(userInput); }
1条答案
按热度按时间wwwo4jvm1#
您提供的链接中的示例显示,它调用以下方法从ibm mq接收:
如果您在这里查看custommqreciever,您可以看到它只是浏览队列中的消息。这意味着消息仍将在队列中,下次连接时将收到相同的消息:
如果要从队列中删除消息,则需要调用从队列中使用消息的方法,而不是从队列中浏览消息。下面是更改的示例
CustomMQReciever.java
这应该能达到你的目的:在
initConnection()
将上述代码更改为以下代码,以使其从队列中删除消息:摆脱:
低于
receive()
更改以下内容:对这样的事情: