我试图使用partition_by
选项在broadway,使同一分区的消息去同一处理器.我实际上有20处理器和消息数据是从0到9
....
processors: [
default: [
max_demand: 50,
concurrency: 20
]
],
partition_by: &partition/1
)
end
defp partition(msg) do
msg.data
end
def handle_message(_processor, msg, _ctx) do
Logger.info "pid #{inspect(self())}"
...
msg
end
奇怪的是,当我在handle消息回调中添加一些日志来监视处理器PID时,我总是为所有传入的消息获得相同的处理器PID。但是当我删除partition_by
行时,我有不同的处理器PID。你知道为什么分区不起作用吗?
1条答案
按热度按时间whlutmcx1#
我刚刚自己在玩百老汇的分区,它和预期的一样好用,具有相同分区键的消息最终位于相同的生产者示例上(由PID标识)。从我在代码中看到的,不清楚从
partition
方法返回什么。您需要注意,应该从那里返回一个数字。如果你的message.data
是一个字符串或其他东西,那么你可能会在同一个分区中结束。如果你的消息中没有一个数字可以用于分区,你可以帮助自己使用内置的erlang方法并哈希字符串(:erlang.phash2/1
)。