来自ibmq的spark流数据

lnvxswe2  于 2021-06-08  发布在  Kafka
关注(0)|答案(1)|浏览(369)

我想从ibmq流式传输数据。我已经试用了在github上找到的代码。
我可以从队列流式传输数据,但每次流式传输时,它都会从队列中获取所有数据。我只想获取推送到队列中的当前数据。我查了很多网站,但没有找到正确的解决办法。
在Kafka,我们有类似 KafkaStreamUtils 用于流式传输近实时数据。是否有类似于ibmq的内容,以便只流式传输最新的数据?

wwwo4jvm

wwwo4jvm1#

您提供的链接中的示例显示,它调用以下方法从ibm mq接收:

CustomMQReciever(String host , int port, String qm, String channel, String qn)

如果您在这里查看custommqreciever,您可以看到它只是浏览队列中的消息。这意味着消息仍将在队列中,下次连接时将收到相同的消息:

MQQueueBrowser browser = (MQQueueBrowser) qSession.createBrowser(queue);

如果要从队列中删除消息,则需要调用从队列中使用消息的方法,而不是从队列中浏览消息。下面是更改的示例 CustomMQReciever.java 这应该能达到你的目的:
initConnection() 将上述代码更改为以下代码,以使其从队列中删除消息:

MQMessageConsumer consumer = (MQMessageConsumer) qSession.createConsumer(queue);

摆脱:

enumeration= browser.getEnumeration();

低于 receive() 更改以下内容:

while (!isStopped() && enumeration.hasMoreElements() )
    {

    receivedMessage= (JMSMessage) enumeration.nextElement();
    String userInput = convertStreamToString(receivedMessage);
    //System.out.println("Received data :'" + userInput + "'");
    store(userInput);
    }

对这样的事情:

while (!isStopped() && (receivedMessage = consumer.receiveNoWait()) != null))
    {
    String userInput = convertStreamToString(receivedMessage);
    //System.out.println("Received data :'" + userInput + "'");
    store(userInput);
    }

相关问题