ruby Kiba ETL和法拉第请求

gcuhipw9  于 9个月前  发布在  Ruby
关注(0)|答案(2)|浏览(115)

我正在做一个有CDC概念的项目。它从数据库中读取更改并将事件推送到rabbitmq队列(它使用debezium)。
之后,我将使用KibaETL处理从该队列消费的事件消息。
KibaETL有三个主要过程

  • 源代码(我从rabbitmq队列中获取,并正常工作)
  • transform(我解析了json消息,并试图创建一个外部API来检索一些额外的数据。
  • 目的地(我正在测试保存到本地文件)

奇怪的是,当我调用法拉第时,它会默默地退出kiba etl管道,它没有显示任何错误。
下面是我的transform类的代码片段。

#frozen_string_literal: true

require 'faraday'
require 'faraday/net_http'
require 'json'
require 'kiba'

class RetrieveJobDataTransform
  def process(row)
    puts "\t[transform] Retrieve Job Information by UUID - job_uuid: #{row}"
    raise ArgumentError, 'row cannot be nil' if row.nil?

    job_uuid = row.dig(:after_data, 'job_uuid')
    
    api_result = make_api_call(job_uuid)

    transformed_data = transform_data(row, api_result)

    transformed_data
  end

  private

  def make_api_call(job_uuid)    
    
    conn = create_connection('https://url.com')    
    
    **# before conn.get call, it shows output properly**
    response = conn.get("api/jobs/#{job_uuid}")
    **# here it doesnt show any output when I was debugging**

    if response.success?
      JSON.parse(response.body)
    else
      nil
    end
  end

  def transform_data(row, api_result)
    row = api_result
  end

  def create_connection(api_url)
    Faraday.new(api_url) do |f|
      f.headers['x-api-key'] = 'api key'
      f.response :json 
      f.adapter :net_http 
      f.options.timeout = 120
    end
  end
end

字符串
和被调用来处理ETL的主类

job = Kiba.parse do
  source KibaSource, bunny_adapter, queue_name

  transform ParseMessageTransform
  transform ValidateMessageTransform

  **#This is the class that silently quit with no error or warnings**
  transform RetrieveJobDataTransform 
  
  destination KibaDestination
end

puts '# job started - Running Kiba job... '
Kiba.run(job)


奇怪的行为是,如果我输入'irb'并添加该类代码,并在没有kiba etl管道内部的情况下调用它,它就可以正常工作,没有任何错误。
谁能给给予点建议?我被这个问题困住了,不知道该怎么办。
Thanks in advance
干杯!干杯!

ymdaylpp

ymdaylpp1#

我会相当惊讶,如果它实际上是木叶有关(即使你可以得到的东西在IRB工作),由于木叶如何工作。
您是否愿意创建尽可能小的复制(也许使用单个文件的Bundler配置https://bundler.io/guides/bundler_in_a_single_file_ruby_script.html),在其中您将“存根”出源和目的地?
奇怪的事情可能会发生,因为你正在为每一行重新创建法拉第连接。
我也可能会尝试使用另一个HTTP客户端,例如HTTParty来进行您正在进行的调用。由于某些原因,您的调用和RabbitMQ调用之间可能会发生冲突!
很高兴能进一步帮助,如果你能通过gist.github.com发布最小的repro,我肯定能帮助你。

7xllpg7q

7xllpg7q2#

source KibaSource,bunny_adapter,queue_name
你好啊!谢谢你的帮助!我为源代码创建了一个存根,它工作得很好。所以与rabbitMQ调用有一些冲突。我认为是由于rabbit在消费事件及其上下文时在内部创建的线程。
我试图使用rabbitmq消费者来了解KibaSource上发生了什么,以避免错误。

相关问题