我理解Kiba的核心是逐行处理。我想要这个,直到目标步骤。我想将转换后的数据推送到Kafka Topic中,但最好是批量推送,而不是单独推送。这可能吗?
假设我们还有下节课
class TransactionProducer
def intialize(data: [])
@data = data
end
def push_to_kafka
$kafka.push(data)
end
end
I think
这是可能的,使用post_process
并将转换后的数据存储在数组中。
data = []
job = Kiba.parse do
source MySource, source_config
transform do |row|
row = Transform...
data << row
end
post_process do
TransactionProducer.new(data).push_to_kafka
end
end
但我想知道是否有其他的方法?
1条答案
按热度按时间baubqpgj1#
虽然可以使用
post_process
,但我建议利用目的地可以实现close
方法的事实(参见https://github.com/thbar/kiba/wiki/Implementing-ETL-destinations),并使用它来“缓冲”到目标(有点像这里的聚合转换https://thibautbarrere.com/2020/03/05/new-in-kiba-etl-v3)。如果你有大量的行,你也可以同时使用
write
和close
来在达到给定的行数时刷新缓冲区(但是要确保刷新close
调用中剩余的所有内容)。希望这对你有帮助!