我目前有一个spring批处理作业,只需一个步骤即可从oracle读取数据,并通过多个spring批处理处理器传递数据( CompositeItemProcessor
)并将数据写入不同的目标,如oracle和文件( CompositeItemWriter
) :
<batch:step id="dataTransformationJob">
<batch:tasklet transaction-manager="transactionManager" task-executor="taskExecutor" throttle-limit="30">
<batch:chunk reader="dataReader" processor="compositeDataProcessor" writer="compositeItemWriter" commit-interval="100"></batch:chunk>
</batch:tasklet>
</batch:step>
在上述步骤中 compositeItemWriter
配置了两个写入程序,它们一个接一个地运行,并将1亿条记录以及一个文件写入oracle。此外,还有 dataReader
具有同步读取方法,以确保多个线程不会从oracle读取相同的数据。从今天起,这项工作需要1小时30分钟才能完成。
我计划将上述工作分解为两部分,以便读卡器/处理器生成关于2个kafka主题的数据(一个用于将数据写入oracle,另一个用于将数据写入文件)。在等式的另一边,我将有一个具有两个并行流的作业,它们从每个主题读取数据,并分别将数据写入oracle和file。
考虑到上述体系结构,我想了解如何重构spring批处理作业以使用kafka。我认为以下方面是我需要解决的问题:
在现有的工作,不使用Kafka,我的油门限制是30;然而,当我在中间使用Kafka,如何决定正确的油门限制?
在现有作业中,提交间隔为100。这意味着 CompositeItemWriter
将为每100条记录调用一次,并且每个编写器将解压块并对其调用write方法。这是否意味着当我给Kafka写信时,会有100个公开电话打给Kafka?
在Kafka中,有没有一种方法可以将多行合并成一条消息,以避免多个网络呼叫?
在使用者方面,我希望有一个spring批处理多线程步骤,能够并行地读取每个分区中的一个主题。springbatch是否已经内置了类来支持这一点?
消费者将使用标准的jdbccbatchitemwriter或flatfileitemwriter来写入从kafka读取的数据,因此我认为这应该是标准的spring批处理。
注意:我知道kafka connect,但不想使用它,因为它需要设置一个connect集群,而且我没有可用的基础设施来支持它。
1条答案
按热度按时间fnatzsnv1#
回答您的问题:
您的Kafka生产商不需要节流,数据应在Kafka消费尽快可用。您的使用者可以根据实现进行限制(如果需要)。
Kafka生产者是可配置的。100条消息并不一定意味着100次网络呼叫。您可以将100条消息写入kafka producer(根据配置,它可以缓冲也可以不缓冲),然后刷新缓冲区以强制网络调用。这将导致(几乎)相同的现有行为。
一条消息可以包含多行,因为kafka消息的负载完全由您决定。但你的推理
multiple rows into one single message in Kafka to avoid multiple network calls?
无效,因为在单个网络调用中可以生成/使用多个消息(行)。对于你的初稿,我建议保持简单,让一行对应一条消息。据我所知不是(但这一点我可能错了)
是的,我相信他们应该工作得很好。