我有下面的observable接收Kafka消费者记录并将其插入数据库。它目前正在工作,我可以接收消费者中预期的数据,并提取这些数据以执行一些Map并将其放入列表中。这个列表中的数据将被插入数据库。
按照现在的编写方式,它将尝试同时插入所有内容。Kafka的唱片有机会保持在10万到100万张之间。我正在寻找一种方法来打破这一点,这样我只需要从消费者记录1000个项目,插入到数据库中,并再次重复下一个1000个项目,并继续下去,直到记录是空的。这可能吗?
我试图使用take的变体,takeuntil和repeat,但它们不起作用。在我订阅之后,调用就结束了,当我这样做的时候,甚至没有进入可观察的。
我能得到一些关于如何写这篇文章的建议吗?这样我就可以从Kafka记录中提取1000条记录,将它们插入数据库,并一直这样做,直到完成所有Kafka记录?谢谢。
请注意,我使用的是rxjava1,需要坚持这个版本。
private final static AtomicInteger INSERT_COUNT = new AtomicInteger(1000);
private final static AtomicInteger RECORD_COUNT = new AtomicInteger();
private final static AtomicInteger REMAINDER = new AtomicInteger();
private final static AtomicInteger REPEAT_COUNT = new AtomicInteger();
public Observable<KafkaConsumerRecord<String, CustomObj>> dbInsert(KafkaConsumerRecords<String, CustomObj> records) {
return Observable.just(records.getDelegate().records())
// attempting to loop based on following counts. Not preferred but unsure of a better way.
// the figures captured here are correct.
// plus this doesn't currently matter anyway cos not able to get it to work using takeUntil, repeat.
.doOnSubscribe(() -> {
RECORD_COUNT.set(records.getDelegate().records().count());
REMAINDER.set(RECORD_COUNT.get() % INSERT_COUNT.get() == 0 ? 0 : 1);
REPEAT_COUNT.set((RECORD_COUNT.get() / INSERT_COUNT.get()) + REMAINDER.get());
})
.map(consumerRecords -> consumerRecords.records("Topic name"))
.map(it -> {
List<CustomRequest> requests = new ArrayList<>();
it.forEach(r -> {
ConsumerRecord<String, SomeObj> record = (ConsumerRecord<String, SomeObj>) r;
CustomRequest request = new CustomRequest (
new String(record.headers().headers("id").iterator().next().value(), StandardCharsets.UTF_8),
Long.parseLong(new String(record.headers().headers("code").iterator().next().value(), StandardCharsets.UTF_8)),
record.value()
);
requests.add(request);
});
return requests;
})
// nothing happens if I uncomment these.
// .takeUntil(customRequests -> customRequests.size() == INSERT_COUNT.get())
// .repeat(REPEAT_COUNT.get())
.doOnNext(customRequests -> {
// planning to do some db inserts here in a transaction of 1000 inserts at a time.
})
.doOnCompleted(() -> System.out.println("Completed"));
}
1条答案
按热度按时间bvn4nwqk1#
以下内容应适用于rxjava1.3.8
下面是输出-
我使用以下版本来测试上述代码-