GenStage.from_enumerable 因断断续续的流而挂起
GenStage.from_enumerable hangs with an intermitent stream
我有一个流,它产生数据的速度不如它消耗的速度快。
所以我有一个这样定义的生产者:
def start_link() do
create_stream
|> GenStage.from_enumerable(name: Producer)
end
然后我的生产者消费者订阅它
def init(:ok) do
{:producer_consumer, :the_state_does_not_matter, subscribe_to: [Producer]}
end
我的消费者订阅了 mu 生产者-消费者
def init(:ok) do
{:consumer, :the_state_does_not_matter, subscribe_to: [ProducerConsumer]}
end
我遇到的问题是消费者挂起,我认为是因为在某些时候生产者没有设法获取新数据并且如文档中所述:
When the enumerable finishes or halts, the stage will exit with
:normal reason. This means that, if a consumer subscribes to the
enumerable stage and the: cancel option is set to: permanent, which is
the default, the consumer will also exit with: the normal reason
所以我读了更多,它建议添加选项 cancel:: transient
以不完成该阶段。我是这样添加的,但是没有用,我是不是遗漏了什么?
|> GenStage.from_enumerable(name: Producer, cancel: :transient)
最初我使用的是 Flow.into_stages(flow, [ProducerConsumer])
但我不能这样做,因为我无法从我的主管树中引用(或者我不知道如何引用)ProducerConsumer
children = [
{Producer, []},
{ProducerConsumer, []},
{Consumer, []}
]
更新
更新从子定义Flow.into_stages传递的引用
children = [
{Producer, [name: ProducerConsumer]},
{ProducerConsumer, []},
{Consumer, []}
]
def start_link(producer_consumer) do
create_stream
|> Flow.into_stages(producer_consumer)
end
** (Mix) Could not start application test: Application.start(:normal, []) returned an error: shutdown:
failed to start child: Producer
** (EXIT) exited in: GenServer.call({:name, ProducerConsumer}, {:"$subscribe", nil, #PID<0.2031.0>, [cancel: :transient]}, 5000)
** (EXIT) no connection to Elixir.ProducerConsumer
错误:
** (Mix) Could not start application test: Application.start(:normal, []) returned an error: shutdown: failed to start child: Producer **
(EXIT) exited in: GenServer.call({:name, ProducerConsumer},
{:"$subscribe", nil, #PID<0.2031.0>, [cancel: :transient]}, 5000) **
(EXIT) no connection to Elixir.ProducerConsumer
只是意味着当 Flow.into_stages 尝试同步到提供的消费者时,该消费者必须已经是 运行。
所以,监督的时候顺序很重要,像这样:
children = [
Consumer,
FlowProducerWorker # worker which implements Flow.into_stages(flow, [Consumer])
]
我有一个流,它产生数据的速度不如它消耗的速度快。
所以我有一个这样定义的生产者:
def start_link() do
create_stream
|> GenStage.from_enumerable(name: Producer)
end
然后我的生产者消费者订阅它
def init(:ok) do
{:producer_consumer, :the_state_does_not_matter, subscribe_to: [Producer]}
end
我的消费者订阅了 mu 生产者-消费者
def init(:ok) do
{:consumer, :the_state_does_not_matter, subscribe_to: [ProducerConsumer]}
end
我遇到的问题是消费者挂起,我认为是因为在某些时候生产者没有设法获取新数据并且如文档中所述:
When the enumerable finishes or halts, the stage will exit with :normal reason. This means that, if a consumer subscribes to the enumerable stage and the: cancel option is set to: permanent, which is the default, the consumer will also exit with: the normal reason
所以我读了更多,它建议添加选项 cancel:: transient
以不完成该阶段。我是这样添加的,但是没有用,我是不是遗漏了什么?
|> GenStage.from_enumerable(name: Producer, cancel: :transient)
最初我使用的是 Flow.into_stages(flow, [ProducerConsumer])
但我不能这样做,因为我无法从我的主管树中引用(或者我不知道如何引用)ProducerConsumer
children = [
{Producer, []},
{ProducerConsumer, []},
{Consumer, []}
]
更新
更新从子定义Flow.into_stages传递的引用
children = [
{Producer, [name: ProducerConsumer]},
{ProducerConsumer, []},
{Consumer, []}
]
def start_link(producer_consumer) do
create_stream
|> Flow.into_stages(producer_consumer)
end
** (Mix) Could not start application test: Application.start(:normal, []) returned an error: shutdown: failed to start child: Producer ** (EXIT) exited in: GenServer.call({:name, ProducerConsumer}, {:"$subscribe", nil, #PID<0.2031.0>, [cancel: :transient]}, 5000) ** (EXIT) no connection to Elixir.ProducerConsumer
错误:
** (Mix) Could not start application test: Application.start(:normal, []) returned an error: shutdown: failed to start child: Producer ** (EXIT) exited in: GenServer.call({:name, ProducerConsumer}, {:"$subscribe", nil, #PID<0.2031.0>, [cancel: :transient]}, 5000) ** (EXIT) no connection to Elixir.ProducerConsumer
只是意味着当 Flow.into_stages 尝试同步到提供的消费者时,该消费者必须已经是 运行。
所以,监督的时候顺序很重要,像这样:
children = [
Consumer,
FlowProducerWorker # worker which implements Flow.into_stages(flow, [Consumer])
]