我正在做一个有CDC概念的项目。它从数据库中读取更改并将事件推送到rabbitmq队列(它使用debezium)。
之后,我将使用KibaETL处理从该队列消费的事件消息。
KibaETL有三个主要过程
- 源代码(我从rabbitmq队列中获取,并正常工作)
- transform(我解析了json消息,并试图创建一个外部API来检索一些额外的数据。
- 目的地(我正在测试保存到本地文件)
奇怪的是,当我调用法拉第时,它会默默地退出kiba etl管道,它没有显示任何错误。
下面是我的transform类的代码片段。
#frozen_string_literal: true
require 'faraday'
require 'faraday/net_http'
require 'json'
require 'kiba'
class RetrieveJobDataTransform
def process(row)
puts "\t[transform] Retrieve Job Information by UUID - job_uuid: #{row}"
raise ArgumentError, 'row cannot be nil' if row.nil?
job_uuid = row.dig(:after_data, 'job_uuid')
api_result = make_api_call(job_uuid)
transformed_data = transform_data(row, api_result)
transformed_data
end
private
def make_api_call(job_uuid)
conn = create_connection('https://url.com')
**# before conn.get call, it shows output properly**
response = conn.get("api/jobs/#{job_uuid}")
**# here it doesnt show any output when I was debugging**
if response.success?
JSON.parse(response.body)
else
nil
end
end
def transform_data(row, api_result)
row = api_result
end
def create_connection(api_url)
Faraday.new(api_url) do |f|
f.headers['x-api-key'] = 'api key'
f.response :json
f.adapter :net_http
f.options.timeout = 120
end
end
end
字符串
和被调用来处理ETL的主类
job = Kiba.parse do
source KibaSource, bunny_adapter, queue_name
transform ParseMessageTransform
transform ValidateMessageTransform
**#This is the class that silently quit with no error or warnings**
transform RetrieveJobDataTransform
destination KibaDestination
end
puts '# job started - Running Kiba job... '
Kiba.run(job)
型
奇怪的行为是,如果我输入'irb'并添加该类代码,并在没有kiba etl管道内部的情况下调用它,它就可以正常工作,没有任何错误。
谁能给给予点建议?我被这个问题困住了,不知道该怎么办。
Thanks in advance
干杯!干杯!
2条答案
按热度按时间ymdaylpp1#
我会相当惊讶,如果它实际上是木叶有关(即使你可以得到的东西在IRB工作),由于木叶如何工作。
您是否愿意创建尽可能小的复制(也许使用单个文件的Bundler配置https://bundler.io/guides/bundler_in_a_single_file_ruby_script.html),在其中您将“存根”出源和目的地?
奇怪的事情可能会发生,因为你正在为每一行重新创建法拉第连接。
我也可能会尝试使用另一个HTTP客户端,例如HTTParty来进行您正在进行的调用。由于某些原因,您的调用和RabbitMQ调用之间可能会发生冲突!
很高兴能进一步帮助,如果你能通过gist.github.com发布最小的repro,我肯定能帮助你。
7xllpg7q2#
source KibaSource,bunny_adapter,queue_name
你好啊!谢谢你的帮助!我为源代码创建了一个存根,它工作得很好。所以与rabbitMQ调用有一些冲突。我认为是由于rabbit在消费事件及其上下文时在内部创建的线程。
我试图使用rabbitmq消费者来了解KibaSource上发生了什么,以避免错误。