目前,我正在使用sidekiq在rails应用程序的后台工作。现在我想用Kafka来做同样的事情,使用消费者-生产者机制。我用的是 ruby-kafka
异步生成消息。
我的方法:在 perform
方法 MyNewJob < ApplicationJob
我要找的工作 produce
Kafka的方法与时机 consumer
收到消息后,我在一个新的类方法中执行实际的代码段(以前在perform方法中执行) MyNewJob
.
它正在按预期工作。我想确认一下,这是不是正确的方法?如果不是,那么什么是正确的方法?
伪代码:
我会用你的任何一个打电话给我的工作 MyNewJob.perform_now
或者 MyNewJob.perform_later
等。
我的新工作.rb
class MyNewJob < ApplicationJob
queue_as TestMixin::QUEUE_NAME
rescue_from(App::ServerError) do
retry_job wait: Rails.application.config.retry_interval
end
def perform(paylaod)
#producing message asynchronously
producer = kafka.async_producer
producer.produce(payload, topic: "topic_name")
end
def self.run_job(payload)
#code to executed previously written inside perform method above.
end
end
consumer.rb(伪代码)
# client consumer topic subscribe logic and then start consuming.
consumer.each do |message|
MyNewJob.run_job(message.payload)
end
我也在考虑另一种方法,我不带 ApplicationJob
类存在并直接在kafka中生成消息,而不是调用job。
请确认。
编辑1:我正在使用 ruby-kafka
使用异步生产者( kafka.async_producer
).
暂无答案!
目前还没有任何答案,快来回答吧!