Kafka长生不老药消费者不断崩溃

carvr3hs  于 2021-06-04  发布在  Kafka
关注(0)|答案(1)|浏览(314)

我有两个混合项目,一个叫做服务器,它向kafka发布阶乘,另一个是一个应该解决阶乘的消费者,但当我启动消费者时,它会不断崩溃。
服务器.exs

defmodule Server do
 alias KafkaEx.Protocol.Produce.Request
 alias KafkaEx.Protocol.CreateTopics.TopicRequest

 def create_topic() do
   KafkaEx.create_topics([%TopicRequest{topic: "factorials-to-be-calculated", num_partitions: 1, replication_factor: 1}])
 end

 def delete_topic() do
   KafkaEx.delete_topics("factorials-to-be-calculated")
 end

 def generate_number(max, min \\ 0) do
   number = :rand.uniform(max - min) + min
   message = %KafkaEx.Protocol.Produce.Message{value: Integer.to_string(number)}
   IO.puts(number)
   request = %{%Request{topic: "factorials-to-be-calculated", required_acks: 1} | messages: [message]}
   {:ok, offset} = KafkaEx.produce(request)
 end
end

阶乘消费者.exs

defmodule Consumer.FactorialConsumer do
  use KafkaEx.GenConsumer
  require Logger
  alias KafkaEx.Protocol.Fetch.Message
  alias KafkaEx.Protocol.Produce.Request

  def handle_message_set(message_set, state) do
    for %Message{value: message} <- message_set do
      Logger.debug(fn -> "message: " <> inspect(message) end)
    end
    {:async_commit, state}
  end

  def factorial(0), do: 1
  def factorial(n), do: n * factorial(n-1)

end

application.exs(消费者)

defmodule Consumer.Application do
  # See https://hexdocs.pm/elixir/Application.html
  # for more information on OTP Applications
  @moduledoc false

  use Application
  import Supervisor.Spec
  @impl true
  def start(_type, _args) do

    gen_consumer_impl = Consumer.FactorialConsumer
    consumer_group_name = "Factorials"
    topic_names = ["factorials-to-be-calculated"]
    consumer_group_opts = []

    children = [
      supervisor(
        KafkaEx.ConsumerGroup,
        [gen_consumer_impl, consumer_group_name, topic_names, consumer_group_opts]
      )
    ]

    # See https://hexdocs.pm/elixir/Supervisor.html
    # for other strategies and supported options
    opts = [strategy: :one_for_one, name: Consumer.Supervisor]
    Supervisor.start_link(children, opts)
  end
end

这是我运行时遇到的错误 iex -S mix run 谢谢你能给我的任何帮助

编辑:我正在使用的库的链接(kafkaex)https://hexdocs.pm/kafka_ex/kafkaex.html
纯文本stacktrace:17:07:13.790[error]genserver#pid<0.220.0>终止**(caseclauseerror)没有case子句匹配:{:错误,{:退出,{:case子句,{:错误,{:未定义,[{consumer.factorialconsumer,:init,[“要计算的阶乘”,0,nil],[},{kafkaex.genconsumer,:init,1,[文件:'lib/kafkaex/genconsumer.ex',行:545]},{:gen\u-server,:init\u-it,2,[文件:'gen\u-server.erl',行:417]},{:gen\u-server,:init\u-it,6,[文件:'gen\u-server.erl',行:385]},{:proc\u-lib,:init\u-do\u-apply,3,[文件:'proc\u-lib.erl',行:226]}}},[{kafkaex.genconsumer.supervisor,:“-start\u-workers/3-fun-0-”,3,[文件:'lib/kafka\u-ex/gen-consumer/supervisor.ex',行:100]},{enum,:“-each/2-列出^foreach/1-0-”,2,[文件:'lib/enum.ex',行:786]},{kafkaex.genconsumer.supervisor,:start\u workers,3,[文件:'lib/kafka\u ex/gen\u consumer/supervisor.ex',行:99]},{kafkaex.genconsumer.supervisor,:start\u link,4,[文件:'lib/kafka\u ex/gen\u consumer/supervisor.ex',行:57]},{:supervisor,:do\u start\u child\u i,3,[文件:'supervisor.erl',行:385]},{:supervisor,:do\u start\u child,2,[文件:'supervisor.erl',行:371]},{:supervisor,:handle\u start\u child,2,[文件:'supervisor.erl',行:677]},{:supervisor,:handle\u call,3,[文件:'supervisor.erl',行:426]},{:child,:undefined,:consumer,{kafkaex.genconsumer.supervisor,:start\u link,[{kafkaex.genconsumer,consumer.factorialconsumer},“阶乘”,[{“要计算的阶乘”,0}],[commit\u interval:1000,generation\u id:224,member\u id:“kafka\u ex-dee44079-8cae-4432-9926-7d35f7d8c7dd”]},:permanent,:infinity,:supervisor,[kafkaex.genconsumer.supervisor]}}(kafka\u ex 0.11.0)lib/kafka\u ex/consumer\u group.ex:340:kafkaex.consumergroup.start\u consumer/5(kafka\u ex 0.11.0)lib/kafka\u ex/consumer\u group/manager.ex:479:kafkaex.consumergroup.manager.start\u consumer/2(kafka\u ex 0.11.0)lib/kafka\u ex/consumer\u group/manager.ex:204:kafkaex.consumergroup.manager.handle\u info/2(stdlib 3.13.2)gen\ u服务器。erl:680:gen_server.try_dispatch/4(stdlib 3.13.2)gen_server。erl:756:gen_server.handle_msg/6(stdlib 3.13.2)进程库。erl:226::proc_lib.init_p_do_apply/3最后一条消息:{:退出,#pid<0.224.0>,{:关机,:重新平衡}}状态:%kafkaex.consumergroup.manager.state{分配:[],消费者模块:consumer.factorialconsumer,消费者选项:[commit\u interval:1000],消费者主管\u pid:\35; pid<0.225.0>,gen\u消费者模块:kafkaex.genconsumer,generation\u id:223,组名称:“factorials”,心跳间隔:1000,心跳计时器:\ pid<0.224.0>,领导者\u id:“kafka\u ex-dee44079-8cae-4432-9926-7d35f7d8c7dd”,成员id:“kafka_ex-dee44079-8cae-4432-9926-7d35f7d8c7dd”,成员:nil,分区分配回调:&kafkaex.consumergroup.partitionassignment.round_robin/2,会话超时:30000,会话超时填充:10000,主管pid:#pid<0.219.0>,主题:[“要计算的因子”],工作人员名称:#pid<0.221.0>]

5us2dqdw

5us2dqdw1#

我只是通过创建一个新的混合项目来修复它

相关问题