我有两个混合项目,一个叫做服务器,它向kafka发布阶乘,另一个是一个应该解决阶乘的消费者,但当我启动消费者时,它会不断崩溃。
服务器.exs
defmodule Server do
alias KafkaEx.Protocol.Produce.Request
alias KafkaEx.Protocol.CreateTopics.TopicRequest
def create_topic() do
KafkaEx.create_topics([%TopicRequest{topic: "factorials-to-be-calculated", num_partitions: 1, replication_factor: 1}])
end
def delete_topic() do
KafkaEx.delete_topics("factorials-to-be-calculated")
end
def generate_number(max, min \\ 0) do
number = :rand.uniform(max - min) + min
message = %KafkaEx.Protocol.Produce.Message{value: Integer.to_string(number)}
IO.puts(number)
request = %{%Request{topic: "factorials-to-be-calculated", required_acks: 1} | messages: [message]}
{:ok, offset} = KafkaEx.produce(request)
end
end
阶乘消费者.exs
defmodule Consumer.FactorialConsumer do
use KafkaEx.GenConsumer
require Logger
alias KafkaEx.Protocol.Fetch.Message
alias KafkaEx.Protocol.Produce.Request
def handle_message_set(message_set, state) do
for %Message{value: message} <- message_set do
Logger.debug(fn -> "message: " <> inspect(message) end)
end
{:async_commit, state}
end
def factorial(0), do: 1
def factorial(n), do: n * factorial(n-1)
end
application.exs(消费者)
defmodule Consumer.Application do
# See https://hexdocs.pm/elixir/Application.html
# for more information on OTP Applications
@moduledoc false
use Application
import Supervisor.Spec
@impl true
def start(_type, _args) do
gen_consumer_impl = Consumer.FactorialConsumer
consumer_group_name = "Factorials"
topic_names = ["factorials-to-be-calculated"]
consumer_group_opts = []
children = [
supervisor(
KafkaEx.ConsumerGroup,
[gen_consumer_impl, consumer_group_name, topic_names, consumer_group_opts]
)
]
# See https://hexdocs.pm/elixir/Supervisor.html
# for other strategies and supported options
opts = [strategy: :one_for_one, name: Consumer.Supervisor]
Supervisor.start_link(children, opts)
end
end
这是我运行时遇到的错误 iex -S mix run
谢谢你能给我的任何帮助
编辑:我正在使用的库的链接(kafkaex)https://hexdocs.pm/kafka_ex/kafkaex.html
纯文本stacktrace:17:07:13.790[error]genserver#pid<0.220.0>终止**(caseclauseerror)没有case子句匹配:{:错误,{:退出,{:case子句,{:错误,{:未定义,[{consumer.factorialconsumer,:init,[“要计算的阶乘”,0,nil],[},{kafkaex.genconsumer,:init,1,[文件:'lib/kafkaex/genconsumer.ex',行:545]},{:gen\u-server,:init\u-it,2,[文件:'gen\u-server.erl',行:417]},{:gen\u-server,:init\u-it,6,[文件:'gen\u-server.erl',行:385]},{:proc\u-lib,:init\u-do\u-apply,3,[文件:'proc\u-lib.erl',行:226]}}},[{kafkaex.genconsumer.supervisor,:“-start\u-workers/3-fun-0-”,3,[文件:'lib/kafka\u-ex/gen-consumer/supervisor.ex',行:100]},{enum,:“-each/2-列出^foreach/1-0-”,2,[文件:'lib/enum.ex',行:786]},{kafkaex.genconsumer.supervisor,:start\u workers,3,[文件:'lib/kafka\u ex/gen\u consumer/supervisor.ex',行:99]},{kafkaex.genconsumer.supervisor,:start\u link,4,[文件:'lib/kafka\u ex/gen\u consumer/supervisor.ex',行:57]},{:supervisor,:do\u start\u child\u i,3,[文件:'supervisor.erl',行:385]},{:supervisor,:do\u start\u child,2,[文件:'supervisor.erl',行:371]},{:supervisor,:handle\u start\u child,2,[文件:'supervisor.erl',行:677]},{:supervisor,:handle\u call,3,[文件:'supervisor.erl',行:426]},{:child,:undefined,:consumer,{kafkaex.genconsumer.supervisor,:start\u link,[{kafkaex.genconsumer,consumer.factorialconsumer},“阶乘”,[{“要计算的阶乘”,0}],[commit\u interval:1000,generation\u id:224,member\u id:“kafka\u ex-dee44079-8cae-4432-9926-7d35f7d8c7dd”]},:permanent,:infinity,:supervisor,[kafkaex.genconsumer.supervisor]}}(kafka\u ex 0.11.0)lib/kafka\u ex/consumer\u group.ex:340:kafkaex.consumergroup.start\u consumer/5(kafka\u ex 0.11.0)lib/kafka\u ex/consumer\u group/manager.ex:479:kafkaex.consumergroup.manager.start\u consumer/2(kafka\u ex 0.11.0)lib/kafka\u ex/consumer\u group/manager.ex:204:kafkaex.consumergroup.manager.handle\u info/2(stdlib 3.13.2)gen\ u服务器。erl:680:gen_server.try_dispatch/4(stdlib 3.13.2)gen_server。erl:756:gen_server.handle_msg/6(stdlib 3.13.2)进程库。erl:226::proc_lib.init_p_do_apply/3最后一条消息:{:退出,#pid<0.224.0>,{:关机,:重新平衡}}状态:%kafkaex.consumergroup.manager.state{分配:[],消费者模块:consumer.factorialconsumer,消费者选项:[commit\u interval:1000],消费者主管\u pid:\35; pid<0.225.0>,gen\u消费者模块:kafkaex.genconsumer,generation\u id:223,组名称:“factorials”,心跳间隔:1000,心跳计时器:\ pid<0.224.0>,领导者\u id:“kafka\u ex-dee44079-8cae-4432-9926-7d35f7d8c7dd”,成员id:“kafka_ex-dee44079-8cae-4432-9926-7d35f7d8c7dd”,成员:nil,分区分配回调:&kafkaex.consumergroup.partitionassignment.round_robin/2,会话超时:30000,会话超时填充:10000,主管pid:#pid<0.219.0>,主题:[“要计算的因子”],工作人员名称:#pid<0.221.0>]
1条答案
按热度按时间5us2dqdw1#
我只是通过创建一个新的混合项目来修复它