java—如何使用kafka流处理数据块/批?

iyzzxitl  于 2021-06-06  发布在  Kafka
关注(0)|答案(3)|浏览(335)

对于大数据中的许多情况,最好一次处理一小部分记录,而不是一次处理一条记录。
自然的例子是调用一些支持批处理的外部api来提高效率。
我们怎么能在Kafka的溪流里做到这一点?我在api中找不到任何与我想要的相似的东西。
到目前为止,我已经:

builder.stream[String, String]("my-input-topic")
.mapValues(externalApiCall).to("my-output-topic")

我想要的是:

builder.stream[String, String]("my-input-topic")
.batched(chunkSize = 2000).map(externalBatchedApiCall).to("my-output-topic")

在scala和akka流中,函数被调用 grouped 或者 batch . 在spark结构化流媒体中我们可以 mapPartitions.map(_.grouped(2000).map(externalBatchedApiCall)) .

8ljdwjyq

8ljdwjyq1#

我怀疑,如果kafka流像其他工具一样支持固定大小的窗口。
但也有基于时间的窗口,由Kafka流支持。https://kafka.apache.org/11/documentation/streams/developer-guide/dsl-api.html#windowing
可以随时间定义窗口大小,而不是记录数。
翻滚时间窗口
滑动时间窗
会话窗口
跳跃时间窗
在您的情况下,翻滚时间窗口可以是一个选项。这些是不重叠的,固定大小的时间窗口。
例如,大小为5000ms的翻滚窗口具有可预测的窗口边界[0;5000),[5000;10000),... — 而不是[1000;6000),[6000;11000),... 或者像[1452;6452),[6452;11452),....

acruukt9

acruukt92#

似乎还不存在。注意这个地方https://issues.apache.org/jira/browse/kafka-7432

r6hnlfcb

r6hnlfcb3#

你可以排队。就像下面这样,

@Component
@Slf4j
public class NormalTopic1StreamProcessor extends AbstractStreamProcessor<String> {

    public NormalTopic1StreamProcessor(KafkaStreamsConfiguration configuration) {
        super(configuration);
    }

    @Override
    Topology buildTopology() {
        KStream<String, String> kStream = streamsBuilder.stream("normalTopic", Consumed.with(Serdes.String(), Serdes.String()));
        // .peek((key, value) -> log.info("message received by stream 0"));
        kStream.process(() -> new AbstractProcessor<String, String>() {
            final LinkedBlockingQueue<String> queue = new LinkedBlockingQueue<>(100);
            final List<String> collection = new ArrayList<>();

            @Override
            public void init(ProcessorContext context) {
                super.init(context);
                context.schedule(Duration.of(1, ChronoUnit.MINUTES), WALL_CLOCK_TIME, timestamp -> {
                    processQueue();
                    context().commit();
                });
            }

            @Override
            public void process(String key, String value) {
                queue.add(value);
                if (queue.remainingCapacity() == 0) {
                    processQueue();
                }
            }

            public void processQueue() {
                queue.drainTo(collection);
                long count = collection.stream().peek(System.out::println).count();
                if (count > 0) {
                    System.out.println("count is " + count);
                    collection.clear();
                }
            }
        });
        kStream.to("normalTopic1");
        return streamsBuilder.build();
    }

}

相关问题