卡夫卡长生不老药消费者不断崩溃

Kafka elixir consumer keeps crashing

我有 2 个混合项目,一个叫做服务器,它向 kafka 发布阶乘,另一个是应该解决阶乘的消费者,但是当我启动消费者时,它一直在崩溃。

server.exs

defmodule Server do
 alias KafkaEx.Protocol.Produce.Request
 alias KafkaEx.Protocol.CreateTopics.TopicRequest

 def create_topic() do
   KafkaEx.create_topics([%TopicRequest{topic: "factorials-to-be-calculated", num_partitions: 1, replication_factor: 1}])
 end

 def delete_topic() do
   KafkaEx.delete_topics("factorials-to-be-calculated")
 end

 def generate_number(max, min \ 0) do
   number = :rand.uniform(max - min) + min
   message = %KafkaEx.Protocol.Produce.Message{value: Integer.to_string(number)}
   IO.puts(number)
   request = %{%Request{topic: "factorials-to-be-calculated", required_acks: 1} | messages: [message]}
   {:ok, offset} = KafkaEx.produce(request)
 end
end

factorial_consumer.exs

defmodule Consumer.FactorialConsumer do
  use KafkaEx.GenConsumer
  require Logger
  alias KafkaEx.Protocol.Fetch.Message
  alias KafkaEx.Protocol.Produce.Request

  def handle_message_set(message_set, state) do
    for %Message{value: message} <- message_set do
      Logger.debug(fn -> "message: " <> inspect(message) end)
    end
    {:async_commit, state}
  end

  def factorial(0), do: 1
  def factorial(n), do: n * factorial(n-1)

end

application.exs(消费者)

defmodule Consumer.Application do
  # See https://hexdocs.pm/elixir/Application.html
  # for more information on OTP Applications
  @moduledoc false

  use Application
  import Supervisor.Spec
  @impl true
  def start(_type, _args) do

    gen_consumer_impl = Consumer.FactorialConsumer
    consumer_group_name = "Factorials"
    topic_names = ["factorials-to-be-calculated"]
    consumer_group_opts = []

    children = [
      supervisor(
        KafkaEx.ConsumerGroup,
        [gen_consumer_impl, consumer_group_name, topic_names, consumer_group_opts]
      )
    ]

    # See https://hexdocs.pm/elixir/Supervisor.html
    # for other strategies and supported options
    opts = [strategy: :one_for_one, name: Consumer.Supervisor]
    Supervisor.start_link(children, opts)
  end
end

这是我在 运行 时得到的错误 iex -S mix run

感谢您能为我提供的任何帮助

编辑: link 到我正在使用的库 (KafkaEx) https://hexdocs.pm/kafka_ex/KafkaEx.html

明文堆栈跟踪: 17:07:13.790 [错误] GenServer #PID<0.220.0> 终止 ** (CaseClauseError) 没有 case 子句匹配:{:error, {{:EXIT, {{:case_clause, {:error, {:undef, [{Consumer.FactorialConsumer, :init, ["factorials -待计算", 0, nil], []}, {KafkaEx.GenConsumer, :init, 1, [文件: 'lib/kafka_ex/gen_consumer.ex', 行: 545]}, {:gen_server, :init_it, 2, [文件: 'gen_server.erl', 行: 417]}, {:gen_server, :init_it, 6, [文件: 'gen_server.erl', 行: 385]}, {:proc_lib, :init_p_do_apply, 3, [文件: 'proc_lib.erl', 行: 226]}]}}}, [{KafkaEx.GenConsumer.Supervisor, :"-start_workers/3-fun-0-", 3, [file: 'lib/kafka_ex/gen_consumer/supervisor.ex', line: 100]}, {Enum, :"-each/2-lists^foreach/1-0- ", 2, [文件: 'lib/enum.ex', 行: 786]}, {KafkaEx.GenConsumer.主管, :start_workers, 3, [文件: 'lib/kafka_ex/gen_consumer/supervisor.ex', 行: 99] }, {KafkaEx.GenConsumer.Supervisor, :start_link, 4, [文件: 'lib/kafka_ex/gen_consumer/supervisor.ex', 行: 57]}, {:supervisor, :do_start_child_i, 3, [文件: 'supervisor.erl', line: 385]}, {:supervisor, :do_start_child, 2, [file: 'supervisor.erl', line: 371]}, {:supervisor, :handle_start_child, 2, [文件: 'supervisor.erl', 行: 677]}, {:supervisor, :handle_call, 3, [文件: 'supervisor.erl', 行: 426]}]}}, {:child, :undefined, :consumer, {KafkaEx.GenConsumer.Supervisor, :start_link, [{KafkaEx.GenConsumer, Consumer.FactorialConsumer}, "阶乘", [{"阶乘-待计算", 0}], [commit_interval: 1000, generation_id: 224, member_id: "kafka_ex-dee44079-8cae-4432-9926-7d35f7d8c7dd"]]}, :permanent, :infinity, :supervisor, [KafkaEx.GenConsumer.Supervisor]}}} (kafka_ex 0.11.0) lib/kafka_ex/consumer_group.ex:340: KafkaEx.ConsumerGroup.start_consumer/5 (kafka_ex 0.11.0) lib/kafka_ex/consumer_group/manager.ex:479: KafkaEx.ConsumerGroup.Manager.start_consumer/2 (kafka_ex 0.11.0) lib/kafka_ex/consumer_group/manager.ex:204: KafkaEx.ConsumerGroup.Manager.handle_info/2 (标准库 3.13.2) gen_server.erl:680: :gen_server.try_dispatch/4 (标准库 3.13.2) gen_server.erl:756: :gen_server.handle_msg/6 (标准库 3.13.2) proc_lib.erl:226: :proc_lib.init_p_do_apply/3 最后一条消息:{:EXIT, #PID<0.224.0>, {:shutdown, :rebalance}} 状态:%KafkaEx.ConsumerGroup.Manager.State{赋值:[], consumer_module: Consumer.FactorialConsumer, consumer_opts: [commit_interval: 1000], consumer_supervisor_pid:#PID<0.225.0>,gen_consumer_module:KafkaEx.GenConsumer,generation_id:223,group_name:“阶乘”,heartbeat_interval:1000,heartbeat_timer:#PID<0.224.0>,leader_id:“kafka_ex-dee44079-8cae-4432-9926-7d35f7d8c7dd”,member_id:“kafka_ex-dee44079-8cae-4432-9926-7d35f7d8c7dd”,成员:无,partition_assignment_callback : &KafkaEx.ConsumerGroup.PartitionAssignment.round_robin/2, session_timeout: 30000, session_timeout_padding: 10000, supervisor_pid: #PID<0.219.0>, 主题: [ “要计算的阶乘”],worker_name:#PID<0.221.0>}

我刚刚通过创建一个新的混音项目修复了它