对于大数据中的许多情况,最好一次处理一小部分记录,而不是一次处理一条记录。
自然的例子是调用一些支持批处理的外部api来提高效率。
我们怎么能在Kafka的溪流里做到这一点?我在api中找不到任何与我想要的相似的东西。
到目前为止,我已经:
builder.stream[String, String]("my-input-topic")
.mapValues(externalApiCall).to("my-output-topic")
我想要的是:
builder.stream[String, String]("my-input-topic")
.batched(chunkSize = 2000).map(externalBatchedApiCall).to("my-output-topic")
在scala和akka流中,函数被调用 grouped
或者 batch
. 在spark结构化流媒体中我们可以 mapPartitions.map(_.grouped(2000).map(externalBatchedApiCall))
.
3条答案
按热度按时间8ljdwjyq1#
我怀疑,如果kafka流像其他工具一样支持固定大小的窗口。
但也有基于时间的窗口,由Kafka流支持。https://kafka.apache.org/11/documentation/streams/developer-guide/dsl-api.html#windowing
可以随时间定义窗口大小,而不是记录数。
翻滚时间窗口
滑动时间窗
会话窗口
跳跃时间窗
在您的情况下,翻滚时间窗口可以是一个选项。这些是不重叠的,固定大小的时间窗口。
例如,大小为5000ms的翻滚窗口具有可预测的窗口边界[0;5000),[5000;10000),... — 而不是[1000;6000),[6000;11000),... 或者像[1452;6452),[6452;11452),....
acruukt92#
似乎还不存在。注意这个地方https://issues.apache.org/jira/browse/kafka-7432
r6hnlfcb3#
你可以排队。就像下面这样,