如何使用quarkus+kafka+smallrye处理流处理异常?
我的代码与quarkus指南上的命令生产者示例非常相似(https://quarkus.io/guides/kafka#imperative-用法)
import io.smallrye.reactive.messaging.annotations.Channel;
import io.smallrye.reactive.messaging.annotations.Emitter;
import javax.inject.Inject;
import javax.ws.rs.POST;
import javax.ws.rs.Path;
import javax.ws.rs.Consumes;
import javax.ws.rs.core.MediaType;
@Path("/prices")
public class PriceResource {
@Inject @Channel("price-create") Emitter<Double> priceEmitter;
@POST
@Consumes(MediaType.TEXT_PLAIN)
public void addPrice(Double price) {
priceEmitter.send(price);
}
}
我想要类似于vanilla kafka库的东西,它提供了处理请求发送的每条记录的回调的选项。
ProducerRecord<String, String> record = new ProducerRecord<>("topic-name", key, value);
producer.send(record, new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
logger.info(record.toString());
if (exception != null) {
logger.error("Producer exception", exception);
}
}
});
tks公司
1条答案
按热度按时间c7rzv4ha1#
文件中有一部分是关于承认的