下面是etl.xml中作业的配置
<batch:job id=“procuerjob”>
<batch:step id="Produce">
<batch:partition partitioner="partitioner">
<batch:handler grid-size="${ partitioner.limit}"></batch:handler>
<batch:step>
<batch:tasklet>
<batch:chunk reader="Reader" writer="kafkaProducer"
commit-interval="20000">
</batch:chunk>
<batch:listeners>
<batch:listener ref="producingListener" />
</batch:listeners>
</batch:tasklet>
</batch:step>
</batch:partition>
</batch:step>
</batch:job>
下面是用于向主题发送消息的代码。
listenablefuture<sendresult<string,message>>listenablefuture=kafkatemplate.send(message);
addcallback(新listenablefuturecallback<sendresult<string,message>>(){
@Override
public void onSuccess(SendResult<String, message > result) {
log.info("marking as SUCCESS");
manager.updateStatus(“someTable”, KafkaResponse.SUCCESS);
}
@Override
public void onFailure(Throwable ex) {
log.info("marking as FAILURE");
manager.updateKafkaStatus(someTable, KafkaResponse.FAILURE);
}
}
一旦kafkatemplate.send(message)被执行,监听器就会被调用,作业就完成了。我看到onsuccess()、onfailure()在作业完成后被调用。如何更改作业的配置,以便在收到kafka主题的确认后调用侦听器?
1条答案
按热度按时间a64a0gku1#
你想给我一些你建议的代码例子来阻止等待未来。那也许会有帮助。
我没有尝试以下方法,但我的想法是: