我有一个mongodb收藏。我想使用vert.x将这个集合的一个子集(基于一些查询)流式传输到一个kafka主题。到目前为止,我已经为 KafkaWriteStream 这似乎适用于伪硬编码字符串。不幸的是,我不知道如何从mongodb获得文档流,这些文档可以稍后使用专用的verticle流到kafka。我该怎么做?有没有人有任何相关的链接或信息?
KafkaWriteStream
1mrurvl11#
vert.x mongo客户端模块允许 ReadStream 来自mongo查询。当你拥有它的时候 KafkaWriteStream ,您可以开始从mongo读取数据并将其发送给kafka。不过,要小心背压:如果mongo加载数据太快,不要让kafka客户端崩溃。你的算法应该是这样的:
ReadStream
readstream.handler(data -> { transformedData = transform(data); // your own transformations writestream.write(transformedData); if (writestream.writeQueueFull()) { readstream.pause(); writestream.drainHandler(done -> { readstream.resume(); }); } });
1条答案
按热度按时间1mrurvl11#
vert.x mongo客户端模块允许
ReadStream
来自mongo查询。当你拥有它的时候
KafkaWriteStream
,您可以开始从mongo读取数据并将其发送给kafka。不过,要小心背压:如果mongo加载数据太快,不要让kafka客户端崩溃。你的算法应该是这样的: