使用vertx将mongo集合流式传输到kafka主题的方法应该是什么?

23c0lvtd  于 2021-06-06  发布在  Kafka
关注(0)|答案(1)|浏览(309)

我有一个mongodb收藏。我想使用vert.x将这个集合的一个子集(基于一些查询)流式传输到一个kafka主题。
到目前为止,我已经为 KafkaWriteStream 这似乎适用于伪硬编码字符串。
不幸的是,我不知道如何从mongodb获得文档流,这些文档可以稍后使用专用的verticle流到kafka。
我该怎么做?有没有人有任何相关的链接或信息?

1mrurvl1

1mrurvl11#

vert.x mongo客户端模块允许 ReadStream 来自mongo查询。
当你拥有它的时候 KafkaWriteStream ,您可以开始从mongo读取数据并将其发送给kafka。
不过,要小心背压:如果mongo加载数据太快,不要让kafka客户端崩溃。你的算法应该是这样的:

readstream.handler(data -> {
  transformedData = transform(data); // your own transformations
  writestream.write(transformedData);
    if (writestream.writeQueueFull()) {
      readstream.pause();
      writestream.drainHandler(done -> {
        readstream.resume();
      });
    }
});

相关问题