java—如何循环和限制每次在可观察对象中获取的项目数

6ss1mwsb  于 2021-06-04  发布在  Kafka
关注(0)|答案(1)|浏览(248)

我有下面的observable接收Kafka消费者记录并将其插入数据库。它目前正在工作,我可以接收消费者中预期的数据,并提取这些数据以执行一些Map并将其放入列表中。这个列表中的数据将被插入数据库。
按照现在的编写方式,它将尝试同时插入所有内容。Kafka的唱片有机会保持在10万到100万张之间。我正在寻找一种方法来打破这一点,这样我只需要从消费者记录1000个项目,插入到数据库中,并再次重复下一个1000个项目,并继续下去,直到记录是空的。这可能吗?
我试图使用take的变体,takeuntil和repeat,但它们不起作用。在我订阅之后,调用就结束了,当我这样做的时候,甚至没有进入可观察的。
我能得到一些关于如何写这篇文章的建议吗?这样我就可以从Kafka记录中提取1000条记录,将它们插入数据库,并一直这样做,直到完成所有Kafka记录?谢谢。
请注意,我使用的是rxjava1,需要坚持这个版本。

private final static AtomicInteger INSERT_COUNT = new AtomicInteger(1000);
private final static AtomicInteger RECORD_COUNT = new AtomicInteger();
private final static AtomicInteger REMAINDER = new AtomicInteger();
private final static AtomicInteger REPEAT_COUNT = new AtomicInteger();

public Observable<KafkaConsumerRecord<String, CustomObj>> dbInsert(KafkaConsumerRecords<String, CustomObj> records) {

    return Observable.just(records.getDelegate().records())
            // attempting to loop based on following counts. Not preferred but unsure of a better way.
            // the figures captured here are correct.
            // plus this doesn't currently matter anyway cos not able to get it to work using takeUntil, repeat. 
            .doOnSubscribe(() -> {
                RECORD_COUNT.set(records.getDelegate().records().count());
                REMAINDER.set(RECORD_COUNT.get() % INSERT_COUNT.get() == 0 ? 0 : 1);
                REPEAT_COUNT.set((RECORD_COUNT.get() / INSERT_COUNT.get()) + REMAINDER.get());
            })
            .map(consumerRecords -> consumerRecords.records("Topic name"))
            .map(it -> {
                List<CustomRequest> requests = new ArrayList<>();
                it.forEach(r -> {
                    ConsumerRecord<String, SomeObj> record = (ConsumerRecord<String, SomeObj>) r;
                    CustomRequest request = new CustomRequest (
                        new String(record.headers().headers("id").iterator().next().value(), StandardCharsets.UTF_8),
                        Long.parseLong(new String(record.headers().headers("code").iterator().next().value(), StandardCharsets.UTF_8)),
                        record.value()
                    );
                    requests.add(request);
                });
                return requests;
            })
            // nothing happens if I uncomment these. 
            // .takeUntil(customRequests -> customRequests.size() == INSERT_COUNT.get())
            // .repeat(REPEAT_COUNT.get())
            .doOnNext(customRequests -> {
                // planning to do some db inserts here in a transaction of 1000 inserts at a time. 
            })
            .doOnCompleted(() -> System.out.println("Completed"));
}
bvn4nwqk

bvn4nwqk1#

以下内容应适用于rxjava1.3.8

rx.Observable.from(List.of(1, 2, 3, 4, 5, 6))
  .buffer(2)
  .doOnNext(r -> System.out.println(r))
  .subscribe();

下面是输出-

[1, 2]
[3, 4]
[5, 6]

我使用以下版本来测试上述代码-

<dependency>
  <groupId>io.reactivex</groupId>
  <artifactId>rxjava</artifactId>
  <version>1.3.8</version>
</dependency>

相关问题