卡夫卡长生不老药消费者不断崩溃
Kafka elixir consumer keeps crashing
我有 2 个混合项目,一个叫做服务器,它向 kafka 发布阶乘,另一个是应该解决阶乘的消费者,但是当我启动消费者时,它一直在崩溃。
server.exs
defmodule Server do
alias KafkaEx.Protocol.Produce.Request
alias KafkaEx.Protocol.CreateTopics.TopicRequest
def create_topic() do
KafkaEx.create_topics([%TopicRequest{topic: "factorials-to-be-calculated", num_partitions: 1, replication_factor: 1}])
end
def delete_topic() do
KafkaEx.delete_topics("factorials-to-be-calculated")
end
def generate_number(max, min \ 0) do
number = :rand.uniform(max - min) + min
message = %KafkaEx.Protocol.Produce.Message{value: Integer.to_string(number)}
IO.puts(number)
request = %{%Request{topic: "factorials-to-be-calculated", required_acks: 1} | messages: [message]}
{:ok, offset} = KafkaEx.produce(request)
end
end
factorial_consumer.exs
defmodule Consumer.FactorialConsumer do
use KafkaEx.GenConsumer
require Logger
alias KafkaEx.Protocol.Fetch.Message
alias KafkaEx.Protocol.Produce.Request
def handle_message_set(message_set, state) do
for %Message{value: message} <- message_set do
Logger.debug(fn -> "message: " <> inspect(message) end)
end
{:async_commit, state}
end
def factorial(0), do: 1
def factorial(n), do: n * factorial(n-1)
end
application.exs(消费者)
defmodule Consumer.Application do
# See https://hexdocs.pm/elixir/Application.html
# for more information on OTP Applications
@moduledoc false
use Application
import Supervisor.Spec
@impl true
def start(_type, _args) do
gen_consumer_impl = Consumer.FactorialConsumer
consumer_group_name = "Factorials"
topic_names = ["factorials-to-be-calculated"]
consumer_group_opts = []
children = [
supervisor(
KafkaEx.ConsumerGroup,
[gen_consumer_impl, consumer_group_name, topic_names, consumer_group_opts]
)
]
# See https://hexdocs.pm/elixir/Supervisor.html
# for other strategies and supported options
opts = [strategy: :one_for_one, name: Consumer.Supervisor]
Supervisor.start_link(children, opts)
end
end
这是我在 运行 时得到的错误
iex -S mix run
感谢您能为我提供的任何帮助
编辑:
link 到我正在使用的库 (KafkaEx) https://hexdocs.pm/kafka_ex/KafkaEx.html
明文堆栈跟踪:
17:07:13.790 [错误] GenServer #PID<0.220.0> 终止
** (CaseClauseError) 没有 case 子句匹配:{:error, {{:EXIT, {{:case_clause, {:error, {:undef, [{Consumer.FactorialConsumer, :init, ["factorials -待计算", 0, nil], []}, {KafkaEx.GenConsumer, :init, 1, [文件: 'lib/kafka_ex/gen_consumer.ex', 行: 545]}, {:gen_server, :init_it, 2, [文件: 'gen_server.erl', 行: 417]}, {:gen_server, :init_it, 6, [文件: 'gen_server.erl', 行: 385]}, {:proc_lib, :init_p_do_apply, 3, [文件: 'proc_lib.erl', 行: 226]}]}}}, [{KafkaEx.GenConsumer.Supervisor, :"-start_workers/3-fun-0-", 3, [file: 'lib/kafka_ex/gen_consumer/supervisor.ex', line: 100]}, {Enum, :"-each/2-lists^foreach/1-0- ", 2, [文件: 'lib/enum.ex', 行: 786]}, {KafkaEx.GenConsumer.主管, :start_workers, 3, [文件: 'lib/kafka_ex/gen_consumer/supervisor.ex', 行: 99] }, {KafkaEx.GenConsumer.Supervisor, :start_link, 4, [文件: 'lib/kafka_ex/gen_consumer/supervisor.ex', 行: 57]}, {:supervisor, :do_start_child_i, 3, [文件: 'supervisor.erl', line: 385]}, {:supervisor, :do_start_child, 2, [file: 'supervisor.erl', line: 371]}, {:supervisor, :handle_start_child, 2, [文件: 'supervisor.erl', 行: 677]}, {:supervisor, :handle_call, 3, [文件: 'supervisor.erl', 行: 426]}]}}, {:child, :undefined, :consumer, {KafkaEx.GenConsumer.Supervisor, :start_link, [{KafkaEx.GenConsumer, Consumer.FactorialConsumer}, "阶乘", [{"阶乘-待计算", 0}], [commit_interval: 1000, generation_id: 224, member_id: "kafka_ex-dee44079-8cae-4432-9926-7d35f7d8c7dd"]]}, :permanent, :infinity, :supervisor, [KafkaEx.GenConsumer.Supervisor]}}}
(kafka_ex 0.11.0) lib/kafka_ex/consumer_group.ex:340: KafkaEx.ConsumerGroup.start_consumer/5
(kafka_ex 0.11.0) lib/kafka_ex/consumer_group/manager.ex:479: KafkaEx.ConsumerGroup.Manager.start_consumer/2
(kafka_ex 0.11.0) lib/kafka_ex/consumer_group/manager.ex:204: KafkaEx.ConsumerGroup.Manager.handle_info/2
(标准库 3.13.2) gen_server.erl:680: :gen_server.try_dispatch/4
(标准库 3.13.2) gen_server.erl:756: :gen_server.handle_msg/6
(标准库 3.13.2) proc_lib.erl:226: :proc_lib.init_p_do_apply/3
最后一条消息:{:EXIT, #PID<0.224.0>, {:shutdown, :rebalance}}
状态:%KafkaEx.ConsumerGroup.Manager.State{赋值:[], consumer_module: Consumer.FactorialConsumer, consumer_opts: [commit_interval: 1000], consumer_supervisor_pid:#PID<0.225.0>,gen_consumer_module:KafkaEx.GenConsumer,generation_id:223,group_name:“阶乘”,heartbeat_interval:1000,heartbeat_timer:#PID<0.224.0>,leader_id:“kafka_ex-dee44079-8cae-4432-9926-7d35f7d8c7dd”,member_id:“kafka_ex-dee44079-8cae-4432-9926-7d35f7d8c7dd”,成员:无,partition_assignment_callback : &KafkaEx.ConsumerGroup.PartitionAssignment.round_robin/2, session_timeout: 30000, session_timeout_padding: 10000, supervisor_pid: #PID<0.219.0>, 主题: [ “要计算的阶乘”],worker_name:#PID<0.221.0>}
我刚刚通过创建一个新的混音项目修复了它
我有 2 个混合项目,一个叫做服务器,它向 kafka 发布阶乘,另一个是应该解决阶乘的消费者,但是当我启动消费者时,它一直在崩溃。
server.exs
defmodule Server do
alias KafkaEx.Protocol.Produce.Request
alias KafkaEx.Protocol.CreateTopics.TopicRequest
def create_topic() do
KafkaEx.create_topics([%TopicRequest{topic: "factorials-to-be-calculated", num_partitions: 1, replication_factor: 1}])
end
def delete_topic() do
KafkaEx.delete_topics("factorials-to-be-calculated")
end
def generate_number(max, min \ 0) do
number = :rand.uniform(max - min) + min
message = %KafkaEx.Protocol.Produce.Message{value: Integer.to_string(number)}
IO.puts(number)
request = %{%Request{topic: "factorials-to-be-calculated", required_acks: 1} | messages: [message]}
{:ok, offset} = KafkaEx.produce(request)
end
end
factorial_consumer.exs
defmodule Consumer.FactorialConsumer do
use KafkaEx.GenConsumer
require Logger
alias KafkaEx.Protocol.Fetch.Message
alias KafkaEx.Protocol.Produce.Request
def handle_message_set(message_set, state) do
for %Message{value: message} <- message_set do
Logger.debug(fn -> "message: " <> inspect(message) end)
end
{:async_commit, state}
end
def factorial(0), do: 1
def factorial(n), do: n * factorial(n-1)
end
application.exs(消费者)
defmodule Consumer.Application do
# See https://hexdocs.pm/elixir/Application.html
# for more information on OTP Applications
@moduledoc false
use Application
import Supervisor.Spec
@impl true
def start(_type, _args) do
gen_consumer_impl = Consumer.FactorialConsumer
consumer_group_name = "Factorials"
topic_names = ["factorials-to-be-calculated"]
consumer_group_opts = []
children = [
supervisor(
KafkaEx.ConsumerGroup,
[gen_consumer_impl, consumer_group_name, topic_names, consumer_group_opts]
)
]
# See https://hexdocs.pm/elixir/Supervisor.html
# for other strategies and supported options
opts = [strategy: :one_for_one, name: Consumer.Supervisor]
Supervisor.start_link(children, opts)
end
end
这是我在 运行 时得到的错误
iex -S mix run
感谢您能为我提供的任何帮助
编辑: link 到我正在使用的库 (KafkaEx) https://hexdocs.pm/kafka_ex/KafkaEx.html
明文堆栈跟踪: 17:07:13.790 [错误] GenServer #PID<0.220.0> 终止 ** (CaseClauseError) 没有 case 子句匹配:{:error, {{:EXIT, {{:case_clause, {:error, {:undef, [{Consumer.FactorialConsumer, :init, ["factorials -待计算", 0, nil], []}, {KafkaEx.GenConsumer, :init, 1, [文件: 'lib/kafka_ex/gen_consumer.ex', 行: 545]}, {:gen_server, :init_it, 2, [文件: 'gen_server.erl', 行: 417]}, {:gen_server, :init_it, 6, [文件: 'gen_server.erl', 行: 385]}, {:proc_lib, :init_p_do_apply, 3, [文件: 'proc_lib.erl', 行: 226]}]}}}, [{KafkaEx.GenConsumer.Supervisor, :"-start_workers/3-fun-0-", 3, [file: 'lib/kafka_ex/gen_consumer/supervisor.ex', line: 100]}, {Enum, :"-each/2-lists^foreach/1-0- ", 2, [文件: 'lib/enum.ex', 行: 786]}, {KafkaEx.GenConsumer.主管, :start_workers, 3, [文件: 'lib/kafka_ex/gen_consumer/supervisor.ex', 行: 99] }, {KafkaEx.GenConsumer.Supervisor, :start_link, 4, [文件: 'lib/kafka_ex/gen_consumer/supervisor.ex', 行: 57]}, {:supervisor, :do_start_child_i, 3, [文件: 'supervisor.erl', line: 385]}, {:supervisor, :do_start_child, 2, [file: 'supervisor.erl', line: 371]}, {:supervisor, :handle_start_child, 2, [文件: 'supervisor.erl', 行: 677]}, {:supervisor, :handle_call, 3, [文件: 'supervisor.erl', 行: 426]}]}}, {:child, :undefined, :consumer, {KafkaEx.GenConsumer.Supervisor, :start_link, [{KafkaEx.GenConsumer, Consumer.FactorialConsumer}, "阶乘", [{"阶乘-待计算", 0}], [commit_interval: 1000, generation_id: 224, member_id: "kafka_ex-dee44079-8cae-4432-9926-7d35f7d8c7dd"]]}, :permanent, :infinity, :supervisor, [KafkaEx.GenConsumer.Supervisor]}}} (kafka_ex 0.11.0) lib/kafka_ex/consumer_group.ex:340: KafkaEx.ConsumerGroup.start_consumer/5 (kafka_ex 0.11.0) lib/kafka_ex/consumer_group/manager.ex:479: KafkaEx.ConsumerGroup.Manager.start_consumer/2 (kafka_ex 0.11.0) lib/kafka_ex/consumer_group/manager.ex:204: KafkaEx.ConsumerGroup.Manager.handle_info/2 (标准库 3.13.2) gen_server.erl:680: :gen_server.try_dispatch/4 (标准库 3.13.2) gen_server.erl:756: :gen_server.handle_msg/6 (标准库 3.13.2) proc_lib.erl:226: :proc_lib.init_p_do_apply/3 最后一条消息:{:EXIT, #PID<0.224.0>, {:shutdown, :rebalance}} 状态:%KafkaEx.ConsumerGroup.Manager.State{赋值:[], consumer_module: Consumer.FactorialConsumer, consumer_opts: [commit_interval: 1000], consumer_supervisor_pid:#PID<0.225.0>,gen_consumer_module:KafkaEx.GenConsumer,generation_id:223,group_name:“阶乘”,heartbeat_interval:1000,heartbeat_timer:#PID<0.224.0>,leader_id:“kafka_ex-dee44079-8cae-4432-9926-7d35f7d8c7dd”,member_id:“kafka_ex-dee44079-8cae-4432-9926-7d35f7d8c7dd”,成员:无,partition_assignment_callback : &KafkaEx.ConsumerGroup.PartitionAssignment.round_robin/2, session_timeout: 30000, session_timeout_padding: 10000, supervisor_pid: #PID<0.219.0>, 主题: [ “要计算的阶乘”],worker_name:#PID<0.221.0>}
我刚刚通过创建一个新的混音项目修复了它