长生不老药 GenStage 生产者状态
elixir GenStage producer state
有一个生产者,它由值列表初始化。
defmodule GenstageExample.Producer do
use GenStage
require Logger
def start_link(initial \ 0) do
GenStage.start_link(__MODULE__, initial, name: __MODULE__)
end
def init(el_list) do
Logger.info "producer init #{inspect(el_list)}"
{:producer, el_list}
end
def handle_demand(demand, [head|tail]) do
Logger.info("producer handle_demand #{demand}, #{inspect(head)}, #{inspect(tail)}")
{:noreply, [head], tail}
end
end
有producer-consumer,它从producer那里收到一个值,用这个值执行一些动作。
defmodule GenstageExample.ProducerConsumer do
use GenStage
require Logger
def start_link do
GenStage.start_link(__MODULE__, :state_doesnt_matter, name: __MODULE__)
end
def init(state) do
Logger.info "producer_consumer init #{inspect(state)}"
{:producer_consumer, state, subscribe_to: [GenstageExample.Producer]}
end
def handle_events(events, _from, state) do
Logger.info "producer_consumer handle_events #{inspect(events)}, #{inspect(state)}"
{:noreply, events, state}
end
end
并且有一个消费者只是显示这个值。
defmodule GenstageExample.Consumer do
use GenStage
require Logger
def start_link do
GenStage.start_link(__MODULE__, :state_doesnt_matter)
end
def init(state) do
Logger.info "consumer init #{inspect(state)}"
{:consumer, state, subscribe_to: [GenstageExample.ProducerConsumer]}
end
def handle_events(events, _from, state) do
Logger.info "consumer handle_events #{inspect(events)}, #{inspect(state)}"
for event <- events do
IO.inspect {self(), event, state}
end
{:noreply, [], state}
end
end
我的application.ex:
defmodule GenstageExample.Application do
@moduledoc false
use Application
def start(_type, _args) do
import Supervisor.Spec, warn: false
worker(GenstageExample.Producer, [["el0", "el1", "el2"]]),
worker(GenstageExample.ProducerConsumer, []),
worker(GenstageExample.Consumer, [])
]
opts = [strategy: :one_for_one, name: GenstageExample.Supervisor]
Supervisor.start_link(children, opts)
end
end
启动后,看到这样的日志:
11:56:42.631 [info] producer init ["el0", "el1", "el2"]
11:56:42.633 [info] producer_consumer init :state_doesnt_matter
11:56:42.633 [info] producer handle_demand 1000, "el0", ["el1", "el2"]
11:56:42.634 [info] consumer init :state_doesnt_matter
11:56:42.634 [info] producer_consumer handle_events ["el0"], :state_doesnt_matter
11:56:42.634 [info] consumer handle_events ["el0"], :state_doesnt_matter
{#PID<0.189.0>, "el0", :state_doesnt_matter}
就是这样,没有其他事情发生。
据我了解,在 {: noreply, [head], tail}
中,我为生产者指定了一个新状态。为什么生产者-消费者不请求此列表中的下一项?
需要指定最大和最小需求
def init(state) do
Logger.info "producer_consumer init #{inspect(state)}"
{:producer_consumer, state, subscribe_to: [{GenstageExample.Producer, max_demand: 1, min_demand: 0}]}
end
和
def init(state) do
Logger.info "consumer init #{inspect(state)}"
{:consumer, state, subscribe_to: [{GenstageExample.ProducerConsumer, max_demand: 1, min_demand: 0}]}
end
有一个生产者,它由值列表初始化。
defmodule GenstageExample.Producer do
use GenStage
require Logger
def start_link(initial \ 0) do
GenStage.start_link(__MODULE__, initial, name: __MODULE__)
end
def init(el_list) do
Logger.info "producer init #{inspect(el_list)}"
{:producer, el_list}
end
def handle_demand(demand, [head|tail]) do
Logger.info("producer handle_demand #{demand}, #{inspect(head)}, #{inspect(tail)}")
{:noreply, [head], tail}
end
end
有producer-consumer,它从producer那里收到一个值,用这个值执行一些动作。
defmodule GenstageExample.ProducerConsumer do
use GenStage
require Logger
def start_link do
GenStage.start_link(__MODULE__, :state_doesnt_matter, name: __MODULE__)
end
def init(state) do
Logger.info "producer_consumer init #{inspect(state)}"
{:producer_consumer, state, subscribe_to: [GenstageExample.Producer]}
end
def handle_events(events, _from, state) do
Logger.info "producer_consumer handle_events #{inspect(events)}, #{inspect(state)}"
{:noreply, events, state}
end
end
并且有一个消费者只是显示这个值。
defmodule GenstageExample.Consumer do
use GenStage
require Logger
def start_link do
GenStage.start_link(__MODULE__, :state_doesnt_matter)
end
def init(state) do
Logger.info "consumer init #{inspect(state)}"
{:consumer, state, subscribe_to: [GenstageExample.ProducerConsumer]}
end
def handle_events(events, _from, state) do
Logger.info "consumer handle_events #{inspect(events)}, #{inspect(state)}"
for event <- events do
IO.inspect {self(), event, state}
end
{:noreply, [], state}
end
end
我的application.ex:
defmodule GenstageExample.Application do
@moduledoc false
use Application
def start(_type, _args) do
import Supervisor.Spec, warn: false
worker(GenstageExample.Producer, [["el0", "el1", "el2"]]),
worker(GenstageExample.ProducerConsumer, []),
worker(GenstageExample.Consumer, [])
]
opts = [strategy: :one_for_one, name: GenstageExample.Supervisor]
Supervisor.start_link(children, opts)
end
end
启动后,看到这样的日志:
11:56:42.631 [info] producer init ["el0", "el1", "el2"]
11:56:42.633 [info] producer_consumer init :state_doesnt_matter
11:56:42.633 [info] producer handle_demand 1000, "el0", ["el1", "el2"]
11:56:42.634 [info] consumer init :state_doesnt_matter
11:56:42.634 [info] producer_consumer handle_events ["el0"], :state_doesnt_matter
11:56:42.634 [info] consumer handle_events ["el0"], :state_doesnt_matter
{#PID<0.189.0>, "el0", :state_doesnt_matter}
就是这样,没有其他事情发生。
据我了解,在 {: noreply, [head], tail}
中,我为生产者指定了一个新状态。为什么生产者-消费者不请求此列表中的下一项?
需要指定最大和最小需求
def init(state) do
Logger.info "producer_consumer init #{inspect(state)}"
{:producer_consumer, state, subscribe_to: [{GenstageExample.Producer, max_demand: 1, min_demand: 0}]}
end
和
def init(state) do
Logger.info "consumer init #{inspect(state)}"
{:consumer, state, subscribe_to: [{GenstageExample.ProducerConsumer, max_demand: 1, min_demand: 0}]}
end