了解进程停止接收消息的原因
Understanding why process stops receiving messages
我有 3 个进程的情况:
- 一个进程充当消息的调度程序:
server
- 一个进程充当主管(对于一个工人):
monitor
- 一个进程充当工人,完成后通知主管:
worker
server
发送监听请求,monitor
首先检查worker
是否busy.If忙,monitor
将消息入队(如果有容量未到达)或将其转发到 worker
。
当 worker 完成处理时,它会通知 client 和 monitor
我的问题是我的工作进程在处理完第一条消息后停止响应
-module(mq).
-compile(export_all).
-record(monstate,{
queue,
qc,
wpid,
free=true,
wref,
init=false,
frun=false
}).
-record(sstate,{
init=false,
mpid=null,
mref=null
}).
-define(QUEUE_SIZE,5).
-define(PROC_SLEEP,2000).
createProcess({M,F,A})->
Pid=spawn(M,F,[A]),
Ref=erlang:monitor(process,Pid),
{Pid,Ref}.
start()->
spawn(?MODULE,server,[#sstate{init=false}]).
server(State=#sstate{init=I})when I=:=false ->
{MPid,MRef}=createProcess({?MODULE,monitor,#monstate{init=false}}),
server(State#sstate{init=true,mpid=MPid,mref=MRef});
server(State=#sstate{mpid=MPid,mref=MRef})->
receive
{From,state}->From ! State,
server(State);
{From,Message}-> MPid ! {request,{From,Message}},
server(State);
{'DOWN',MRef,process,MPid,_}-> {NewMPid,NewMRef}=createProcess({?MODULE,monitor,#monstate{init=false}}),
server(State#sstate{mpid=NewMPid,mref=NewMRef});
_ ->exit(invalid_message)
end.
tryEnqueue(Message,MState=#monstate{queue=Q,qc=C}) when C<?QUEUE_SIZE->
NewQueue=queue:in(Message,Q),
{queued,MState#monstate{qc=C+1,queue=NewQueue}};
tryEnqueue(_,MState)->{queue_full,MState}.
monitor(MState=#monstate{wpid=_,wref=_,init=I}) when I=:= false ->
{WorkerPid,WorkerRef}=createProcess({?MODULE,worker,self()}),
monitor(MState#monstate{wpid=WorkerPid,wref=WorkerRef,init=true,qc=0,queue=queue:new(),frun=true});
monitor(MState=#monstate{wpid=W,free=F,wref=Ref,queue=Q,qc=C,frun=R})->
receive
{request,{From ,Message}} ->
{Result,NewState}=tryEnqueue({From,Message},MState),
case Result of
queue_full -> From ! {queue_full,Message};
_ -> ok
end,
case R of
true -> self() ! {worker,{finished,R}},
monitor(NewState#monstate{frun=false});
false -> monitor(NewState#monstate{frun=false})
end;
{worker,{finished,_}}-> case queue:out(Q) of
{{_,Element},Rest} -> W ! Element,
monitor(MState#monstate{free=false,queue=Rest,qc=C-1});
{empty,Rest} -> monitor(MState#monstate{free=true,queue=Rest})
end;
{'DOWN',Ref,process,_,_}->
{NewWorkerPid,NewWorkerRef}=createProcess({?MODULE,worker,self()}),
monitor(MState#monstate{wpid=NewWorkerPid,wref=NewWorkerRef,free=true});
_->exit(invalid_message)
end.
worker(MPid)->
receive
{From,MSG} ->
timer:sleep(?PROC_SLEEP),
From ! {processed,MSG},
MPid ! {worker,{finished,MSG}},
worker(MPid);
_ ->exit(bad_msg)
end.
用法
2> A=mq:start().
<0.83.0>
3> A ! {self(),aa}.
{<0.76.0>,aa}
4> flush().
Shell got {processed,aa}
ok
5> A ! {self(),aa}.
{<0.76.0>,aa}
6> flush().
ok
我添加了一个跟踪器来查看发生了什么:
10> dbg:tracer().
{ok,<0.96.0>}
11> dbg:p(new,[sos,m]).
{ok,[{matched,nonode@nohost,0}]}
第一个运行:
14> A ! {self(),aa}.
(<0.100.0>) << {<0.76.0>,aa} // message received my server
(<0.100.0>) <0.101.0> ! {request,{<0.76.0>,aa}} //message forwarded by server to monitor
{<0.76.0>,aa}
(<0.101.0>) << {request,{<0.76.0>,aa}}
15> (<0.101.0>) <0.101.0> ! {worker,{finished,true}} //monitor starting the cycle
15> (<0.101.0>) << {worker,{finished,true}}
15> (<0.101.0>) <0.102.0> ! {<0.76.0>,aa} // monitor sending message to worker
15> (<0.102.0>) << {<0.76.0>,aa}
15> (<0.105.0>) <0.62.0> ! {io_request,<0.105.0>,
#Ref<0.3226054513.2760638467.167990>,
{get_until,unicode,
["15",62,32],
erl_scan,tokens,
[1,[text]]}}
15> (<0.102.0>) << timeout //worker getting timeout ??
15> (<0.102.0>) <0.76.0> ! {processed,aa} //worker sends to self() thje message
15> (<0.102.0>) <0.101.0> ! {worker,{finished,aa}} //worker notifies monitor to update state
15> (<0.101.0>) << {worker,{finished,aa}}
第二个运行:
15> A ! {self(),aa}.
(<0.100.0>) << {<0.76.0>,aa}
(<0.100.0>) <0.101.0> ! {request,{<0.76.0>,aa}} //monitor receiveing message
{<0.76.0>,aa}
(<0.101.0>) << {request,{<0.76.0>,aa}}
16> (<0.106.0>) <0.62.0> ! {io_request,<0.106.0>,
#Ref<0.3226054513.2760638467.168007>,
{get_until,unicode,
["16",62,32],
erl_scan,tokens,
[1,[text]]}}
正如您从我的跟踪中看到的那样,在第一次通话中我不明白什么 happens.Does 我的 worker
超时了,如果是的话,为什么?
P.S frun
变量用作标志,仅在第一次 monitor
迭代时为真,因此当第一次项目到达过程将调用自身来处理它(将其发送给工人),因为工人是免费的。
在第一个 运行 之后,只要 worker
发出他空闲的信号,monitor
就会将项目从队列中取出。
更新
因此,在收到有用的评论后,我在 monitor
中稍微更改了我的逻辑,以便 worker
在第一个 运行 或完成后收到消息并通知 monitor
, monitor
的队列中仍有项目。
我还是做不到 work.Where 是僵局吗?
monitor(MState=#monstate{wpid=W,free=F,wref=Ref,queue=Q,qc=C,frun=FirstRun})->
receive
{request,{From ,Message}} -> case FirstRun of
true -> W ! {From,Message},
monitor(MState#monstate{frun=false,free=false});
false ->
St=case tryEnqueue({From,Message},MState) of
{queue_full,S} -> From ! {queue_full,Message},
S;
{queued,S} -> S
end,
monitor(St)
end;
{worker,{finished,_}}-> case queue:out(Q) of
{{_,Element},Rest} -> W ! Element,
monitor(MState#monstate{free=false,queue=Rest,qc=C-1});
{empty,Rest} -> monitor(MState#monstate{free=true,queue=Rest})
end;
end.
在你的代码中,似乎 frun
在第一个 运行:
之后总是假的
case R of
true -> self() ! {worker,{finished,R}},
monitor(NewState#monstate{frun=false});
false -> monitor(NewState#monstate{frun=false})
end;
一旦到达 false
,将不会传送 {worker, {finished, R}}
消息,因此不会从队列中提取任何元素。
更新:死锁序列:
- Monitor 收到第一份工作并将其转发给 worker
- 监视器的
frun
现在是假的
- 工作人员执行工作
- 工作人员通知班长工作已完成
- 因为监视器的队列是空的,所以没有任何反应。
- Monitor 收到第二份作业,因为
frun
为 false,作业不会转发给 worker
monitor
行为需要依赖于 frun
。它只需要取决于 worker
是否为 free
。我更新了 monitor
函数以在以下代码中反映这一点。
-module(mq).
-compile(export_all).
-record(monstate,{
queue,
qc,
wpid,
free=true,
wref,
init=false
}).
-record(sstate,{
init=false,
mpid=null,
mref=null
}).
-define(QUEUE_SIZE,5).
-define(PROC_SLEEP,2000).
createProcess({M,F,A})->
Pid=spawn(M,F,[A]),
Ref=erlang:monitor(process,Pid),
{Pid,Ref}.
start()->
spawn(?MODULE,server,[#sstate{init=false}]).
server(State=#sstate{init=I})when I=:=false ->
{MPid,MRef}=createProcess({?MODULE,monitor,#monstate{init=false}}),
server(State#sstate{init=true,mpid=MPid,mref=MRef});
server(State=#sstate{mpid=MPid,mref=MRef})->
receive
{From,state}->From ! State,
server(State);
{From,Message}-> MPid ! {request,{From,Message}},
server(State);
{'DOWN',MRef,process,MPid,_}-> {NewMPid,NewMRef}=createProcess({?MODULE,monitor,#monstate{init=false}}),
server(State#sstate{mpid=NewMPid,mref=NewMRef});
_ ->exit(invalid_message)
end.
tryEnqueue(Message,MState=#monstate{queue=Q,qc=C}) when C<?QUEUE_SIZE->
NewQueue=queue:in(Message,Q),
{queued,MState#monstate{qc=C+1,queue=NewQueue}};
tryEnqueue(_,MState)->{queue_full,MState}.
monitor(MState=#monstate{wpid=_,wref=_,init=I}) when I=:= false ->
{WorkerPid,WorkerRef}=createProcess({?MODULE,worker,self()}),
monitor(MState#monstate{wpid=WorkerPid,wref=WorkerRef,init=true,qc=0,queue=queue:new()});
monitor(MState=#monstate{wpid=W,free=F,wref=Ref,queue=Q,qc=C})->
receive
{request,{From ,Message}} ->
%% check whether worker is free or not
case F of
true ->
W ! {From,Message},
monitor(MState#monstate{free=false});
false ->
St=case tryEnqueue({From,Message},MState) of
{queue_full,S} ->
From ! {queue_full,Message},
S;
{queued,S} -> S
end,
monitor(St)
end;
{worker,{finished,_}} ->
case queue:out(Q) of
{{_,Element},Rest} ->
W ! Element,
monitor(MState#monstate{free=false,queue=Rest,qc=C-1});
{empty,Rest} ->
monitor(MState#monstate{free=true,queue=Rest})
end;
{'DOWN',Ref,process,_,_} ->
{NewWorkerPid,NewWorkerRef}=createProcess({?MODULE,worker,self()}),
monitor(MState#monstate{wpid=NewWorkerPid,wref=NewWorkerRef,free=true});
_->exit(invalid_message)
end.
worker(MPid)->
receive
{From,MSG} ->
timer:sleep(?PROC_SLEEP),
From ! {processed,MSG},
MPid ! {worker,{finished,MSG}},
worker(MPid);
_ ->exit(bad_msg)
end.
用法
Eshell V10.5 (abort with ^G)
1> c(mq).
mq.erl:2: Warning: export_all flag enabled - all functions will be exported
{ok,mq}
2> A=mq:start().
<0.92.0>
3> A ! {self(),aa}.
{<0.85.0>,aa}
4> flush().
Shell got {processed,aa}
ok
5> A ! {self(),aa}.
{<0.85.0>,aa}
6> flush().
Shell got {processed,aa}
ok
7> A ! {self(), aa}, A ! {self(), bb}.
{<0.85.0>,bb}
8> flush().
Shell got {processed,aa}
Shell got {processed,bb}
ok
9>
我有 3 个进程的情况:
- 一个进程充当消息的调度程序:
server
- 一个进程充当主管(对于一个工人):
monitor
- 一个进程充当工人,完成后通知主管:
worker
server
发送监听请求,monitor
首先检查worker
是否busy.If忙,monitor
将消息入队(如果有容量未到达)或将其转发到 worker
。
当 worker 完成处理时,它会通知 client 和 monitor
我的问题是我的工作进程在处理完第一条消息后停止响应
-module(mq).
-compile(export_all).
-record(monstate,{
queue,
qc,
wpid,
free=true,
wref,
init=false,
frun=false
}).
-record(sstate,{
init=false,
mpid=null,
mref=null
}).
-define(QUEUE_SIZE,5).
-define(PROC_SLEEP,2000).
createProcess({M,F,A})->
Pid=spawn(M,F,[A]),
Ref=erlang:monitor(process,Pid),
{Pid,Ref}.
start()->
spawn(?MODULE,server,[#sstate{init=false}]).
server(State=#sstate{init=I})when I=:=false ->
{MPid,MRef}=createProcess({?MODULE,monitor,#monstate{init=false}}),
server(State#sstate{init=true,mpid=MPid,mref=MRef});
server(State=#sstate{mpid=MPid,mref=MRef})->
receive
{From,state}->From ! State,
server(State);
{From,Message}-> MPid ! {request,{From,Message}},
server(State);
{'DOWN',MRef,process,MPid,_}-> {NewMPid,NewMRef}=createProcess({?MODULE,monitor,#monstate{init=false}}),
server(State#sstate{mpid=NewMPid,mref=NewMRef});
_ ->exit(invalid_message)
end.
tryEnqueue(Message,MState=#monstate{queue=Q,qc=C}) when C<?QUEUE_SIZE->
NewQueue=queue:in(Message,Q),
{queued,MState#monstate{qc=C+1,queue=NewQueue}};
tryEnqueue(_,MState)->{queue_full,MState}.
monitor(MState=#monstate{wpid=_,wref=_,init=I}) when I=:= false ->
{WorkerPid,WorkerRef}=createProcess({?MODULE,worker,self()}),
monitor(MState#monstate{wpid=WorkerPid,wref=WorkerRef,init=true,qc=0,queue=queue:new(),frun=true});
monitor(MState=#monstate{wpid=W,free=F,wref=Ref,queue=Q,qc=C,frun=R})->
receive
{request,{From ,Message}} ->
{Result,NewState}=tryEnqueue({From,Message},MState),
case Result of
queue_full -> From ! {queue_full,Message};
_ -> ok
end,
case R of
true -> self() ! {worker,{finished,R}},
monitor(NewState#monstate{frun=false});
false -> monitor(NewState#monstate{frun=false})
end;
{worker,{finished,_}}-> case queue:out(Q) of
{{_,Element},Rest} -> W ! Element,
monitor(MState#monstate{free=false,queue=Rest,qc=C-1});
{empty,Rest} -> monitor(MState#monstate{free=true,queue=Rest})
end;
{'DOWN',Ref,process,_,_}->
{NewWorkerPid,NewWorkerRef}=createProcess({?MODULE,worker,self()}),
monitor(MState#monstate{wpid=NewWorkerPid,wref=NewWorkerRef,free=true});
_->exit(invalid_message)
end.
worker(MPid)->
receive
{From,MSG} ->
timer:sleep(?PROC_SLEEP),
From ! {processed,MSG},
MPid ! {worker,{finished,MSG}},
worker(MPid);
_ ->exit(bad_msg)
end.
用法
2> A=mq:start().
<0.83.0>
3> A ! {self(),aa}.
{<0.76.0>,aa}
4> flush().
Shell got {processed,aa}
ok
5> A ! {self(),aa}.
{<0.76.0>,aa}
6> flush().
ok
我添加了一个跟踪器来查看发生了什么:
10> dbg:tracer().
{ok,<0.96.0>}
11> dbg:p(new,[sos,m]).
{ok,[{matched,nonode@nohost,0}]}
第一个运行:
14> A ! {self(),aa}.
(<0.100.0>) << {<0.76.0>,aa} // message received my server
(<0.100.0>) <0.101.0> ! {request,{<0.76.0>,aa}} //message forwarded by server to monitor
{<0.76.0>,aa}
(<0.101.0>) << {request,{<0.76.0>,aa}}
15> (<0.101.0>) <0.101.0> ! {worker,{finished,true}} //monitor starting the cycle
15> (<0.101.0>) << {worker,{finished,true}}
15> (<0.101.0>) <0.102.0> ! {<0.76.0>,aa} // monitor sending message to worker
15> (<0.102.0>) << {<0.76.0>,aa}
15> (<0.105.0>) <0.62.0> ! {io_request,<0.105.0>,
#Ref<0.3226054513.2760638467.167990>,
{get_until,unicode,
["15",62,32],
erl_scan,tokens,
[1,[text]]}}
15> (<0.102.0>) << timeout //worker getting timeout ??
15> (<0.102.0>) <0.76.0> ! {processed,aa} //worker sends to self() thje message
15> (<0.102.0>) <0.101.0> ! {worker,{finished,aa}} //worker notifies monitor to update state
15> (<0.101.0>) << {worker,{finished,aa}}
第二个运行:
15> A ! {self(),aa}.
(<0.100.0>) << {<0.76.0>,aa}
(<0.100.0>) <0.101.0> ! {request,{<0.76.0>,aa}} //monitor receiveing message
{<0.76.0>,aa}
(<0.101.0>) << {request,{<0.76.0>,aa}}
16> (<0.106.0>) <0.62.0> ! {io_request,<0.106.0>,
#Ref<0.3226054513.2760638467.168007>,
{get_until,unicode,
["16",62,32],
erl_scan,tokens,
[1,[text]]}}
正如您从我的跟踪中看到的那样,在第一次通话中我不明白什么 happens.Does 我的 worker
超时了,如果是的话,为什么?
P.S frun
变量用作标志,仅在第一次 monitor
迭代时为真,因此当第一次项目到达过程将调用自身来处理它(将其发送给工人),因为工人是免费的。
在第一个 运行 之后,只要 worker
发出他空闲的信号,monitor
就会将项目从队列中取出。
更新
因此,在收到有用的评论后,我在 monitor
中稍微更改了我的逻辑,以便 worker
在第一个 运行 或完成后收到消息并通知 monitor
, monitor
的队列中仍有项目。
我还是做不到 work.Where 是僵局吗?
monitor(MState=#monstate{wpid=W,free=F,wref=Ref,queue=Q,qc=C,frun=FirstRun})->
receive
{request,{From ,Message}} -> case FirstRun of
true -> W ! {From,Message},
monitor(MState#monstate{frun=false,free=false});
false ->
St=case tryEnqueue({From,Message},MState) of
{queue_full,S} -> From ! {queue_full,Message},
S;
{queued,S} -> S
end,
monitor(St)
end;
{worker,{finished,_}}-> case queue:out(Q) of
{{_,Element},Rest} -> W ! Element,
monitor(MState#monstate{free=false,queue=Rest,qc=C-1});
{empty,Rest} -> monitor(MState#monstate{free=true,queue=Rest})
end;
end.
在你的代码中,似乎 frun
在第一个 运行:
case R of
true -> self() ! {worker,{finished,R}},
monitor(NewState#monstate{frun=false});
false -> monitor(NewState#monstate{frun=false})
end;
一旦到达 false
,将不会传送 {worker, {finished, R}}
消息,因此不会从队列中提取任何元素。
更新:死锁序列:
- Monitor 收到第一份工作并将其转发给 worker
- 监视器的
frun
现在是假的 - 工作人员执行工作
- 工作人员通知班长工作已完成
- 因为监视器的队列是空的,所以没有任何反应。
- Monitor 收到第二份作业,因为
frun
为 false,作业不会转发给 worker
monitor
行为需要依赖于 frun
。它只需要取决于 worker
是否为 free
。我更新了 monitor
函数以在以下代码中反映这一点。
-module(mq).
-compile(export_all).
-record(monstate,{
queue,
qc,
wpid,
free=true,
wref,
init=false
}).
-record(sstate,{
init=false,
mpid=null,
mref=null
}).
-define(QUEUE_SIZE,5).
-define(PROC_SLEEP,2000).
createProcess({M,F,A})->
Pid=spawn(M,F,[A]),
Ref=erlang:monitor(process,Pid),
{Pid,Ref}.
start()->
spawn(?MODULE,server,[#sstate{init=false}]).
server(State=#sstate{init=I})when I=:=false ->
{MPid,MRef}=createProcess({?MODULE,monitor,#monstate{init=false}}),
server(State#sstate{init=true,mpid=MPid,mref=MRef});
server(State=#sstate{mpid=MPid,mref=MRef})->
receive
{From,state}->From ! State,
server(State);
{From,Message}-> MPid ! {request,{From,Message}},
server(State);
{'DOWN',MRef,process,MPid,_}-> {NewMPid,NewMRef}=createProcess({?MODULE,monitor,#monstate{init=false}}),
server(State#sstate{mpid=NewMPid,mref=NewMRef});
_ ->exit(invalid_message)
end.
tryEnqueue(Message,MState=#monstate{queue=Q,qc=C}) when C<?QUEUE_SIZE->
NewQueue=queue:in(Message,Q),
{queued,MState#monstate{qc=C+1,queue=NewQueue}};
tryEnqueue(_,MState)->{queue_full,MState}.
monitor(MState=#monstate{wpid=_,wref=_,init=I}) when I=:= false ->
{WorkerPid,WorkerRef}=createProcess({?MODULE,worker,self()}),
monitor(MState#monstate{wpid=WorkerPid,wref=WorkerRef,init=true,qc=0,queue=queue:new()});
monitor(MState=#monstate{wpid=W,free=F,wref=Ref,queue=Q,qc=C})->
receive
{request,{From ,Message}} ->
%% check whether worker is free or not
case F of
true ->
W ! {From,Message},
monitor(MState#monstate{free=false});
false ->
St=case tryEnqueue({From,Message},MState) of
{queue_full,S} ->
From ! {queue_full,Message},
S;
{queued,S} -> S
end,
monitor(St)
end;
{worker,{finished,_}} ->
case queue:out(Q) of
{{_,Element},Rest} ->
W ! Element,
monitor(MState#monstate{free=false,queue=Rest,qc=C-1});
{empty,Rest} ->
monitor(MState#monstate{free=true,queue=Rest})
end;
{'DOWN',Ref,process,_,_} ->
{NewWorkerPid,NewWorkerRef}=createProcess({?MODULE,worker,self()}),
monitor(MState#monstate{wpid=NewWorkerPid,wref=NewWorkerRef,free=true});
_->exit(invalid_message)
end.
worker(MPid)->
receive
{From,MSG} ->
timer:sleep(?PROC_SLEEP),
From ! {processed,MSG},
MPid ! {worker,{finished,MSG}},
worker(MPid);
_ ->exit(bad_msg)
end.
用法
Eshell V10.5 (abort with ^G)
1> c(mq).
mq.erl:2: Warning: export_all flag enabled - all functions will be exported
{ok,mq}
2> A=mq:start().
<0.92.0>
3> A ! {self(),aa}.
{<0.85.0>,aa}
4> flush().
Shell got {processed,aa}
ok
5> A ! {self(),aa}.
{<0.85.0>,aa}
6> flush().
Shell got {processed,aa}
ok
7> A ! {self(), aa}, A ! {self(), bb}.
{<0.85.0>,bb}
8> flush().
Shell got {processed,aa}
Shell got {processed,bb}
ok
9>