ruby Kiba能否支持“Bulk”目的地而不是1 by 1?

tquggr8v  于 2023-10-18  发布在  Ruby
关注(0)|答案(1)|浏览(111)

我理解Kiba的核心是逐行处理。我想要这个,直到目标步骤。我想将转换后的数据推送到Kafka Topic中,但最好是批量推送,而不是单独推送。这可能吗?
假设我们还有下节课

class TransactionProducer
  def intialize(data: [])
    @data = data
  end

  def push_to_kafka
    $kafka.push(data)
  end
end

I think这是可能的,使用post_process并将转换后的数据存储在数组中。

data = []

job = Kiba.parse do
  source MySource, source_config

  transform do |row|
    row = Transform...
    data << row
  end

  post_process do
    TransactionProducer.new(data).push_to_kafka
  end
end

但我想知道是否有其他的方法?

baubqpgj

baubqpgj1#

虽然可以使用post_process,但我建议利用目的地可以实现close方法的事实(参见https://github.com/thbar/kiba/wiki/Implementing-ETL-destinations),并使用它来“缓冲”到目标(有点像这里的聚合转换https://thibautbarrere.com/2020/03/05/new-in-kiba-etl-v3)。
如果你有大量的行,你也可以同时使用writeclose来在达到给定的行数时刷新缓冲区(但是要确保刷新close调用中剩余的所有内容)。
希望这对你有帮助!

相关问题