alpakka kafka无法定义批处理

u3r8eeie  于 2021-06-08  发布在  Kafka
关注(0)|答案(1)|浏览(420)
// #atLeastOnceBatch
        Consumer.Control control =
            Consumer.committableSource(consumerSettings, Subscriptions.topics("topic1"))
                .mapAsync(1, msg ->
                    business(msg.record().key(), msg.record().value())
                            .thenApply(done -> msg.committableOffset())
                )
                .batch(
                    20,
                    ConsumerMessage::createCommittableOffsetBatch,
                    ConsumerMessage.CommittableOffsetBatch::updated
                )
                .mapAsync(3, c -> c.commitJavadsl())
                .to(Sink.ignore())
                .run(materializer);
        // #atLeastOnceBatch

我试图测试alpakkafka连接器至少一次批处理示例,我得到以下编译时错误
consumermessage类型未定义适用于此处的createcommittableoffsetbatch(consumermessage.committableoffset)
并且类型consumermessage.committableoffsetbatch未定义适用于此处的更新(s,consumermessage.committableoffset)

ppcbkaq5

ppcbkaq51#

这些版本在V0.22中提供。不幸的是,与akka文档相比,alpakka文档缺少一点。

相关问题