了解进程停止接收消息的原因

Understanding why process stops receiving messages

我有 3 个进程的情况:

server发送监听请求,monitor首先检查worker是否busy.If忙,monitor将消息入队(如果有容量未到达)或将其转发到 worker。 当 worker 完成处理时,它会通知 client 和 monitor

我的问题是我的工作进程在处理完第一条消息后停止响应

-module(mq).
-compile(export_all).

-record(monstate,{
    queue,
    qc,
    wpid,
    free=true,
    wref,
    init=false,
    frun=false
}).
-record(sstate,{
    init=false,
    mpid=null,
    mref=null
}).

-define(QUEUE_SIZE,5).
-define(PROC_SLEEP,2000).


createProcess({M,F,A})->
    Pid=spawn(M,F,[A]),
    Ref=erlang:monitor(process,Pid),
    {Pid,Ref}.

start()->
    spawn(?MODULE,server,[#sstate{init=false}]).

server(State=#sstate{init=I})when I=:=false ->
    {MPid,MRef}=createProcess({?MODULE,monitor,#monstate{init=false}}),
    server(State#sstate{init=true,mpid=MPid,mref=MRef});

server(State=#sstate{mpid=MPid,mref=MRef})->
    receive
           {From,state}->From ! State,
                            server(State);
           {From,Message}-> MPid ! {request,{From,Message}},
                            server(State);
                
            {'DOWN',MRef,process,MPid,_}-> {NewMPid,NewMRef}=createProcess({?MODULE,monitor,#monstate{init=false}}),
                                            server(State#sstate{mpid=NewMPid,mref=NewMRef});
            _ ->exit(invalid_message)
                                    
    end.
  

tryEnqueue(Message,MState=#monstate{queue=Q,qc=C}) when C<?QUEUE_SIZE->
    NewQueue=queue:in(Message,Q),
    {queued,MState#monstate{qc=C+1,queue=NewQueue}};
tryEnqueue(_,MState)->{queue_full,MState}.

monitor(MState=#monstate{wpid=_,wref=_,init=I}) when I=:= false ->
    {WorkerPid,WorkerRef}=createProcess({?MODULE,worker,self()}),
    monitor(MState#monstate{wpid=WorkerPid,wref=WorkerRef,init=true,qc=0,queue=queue:new(),frun=true});

monitor(MState=#monstate{wpid=W,free=F,wref=Ref,queue=Q,qc=C,frun=R})->
    receive
        
        {request,{From ,Message}} ->  
                                       {Result,NewState}=tryEnqueue({From,Message},MState),
                                        case Result of 
                                            queue_full -> From ! {queue_full,Message};
                                            _ -> ok
                                        end,
                                        case R of
                                            true -> self() ! {worker,{finished,R}},
                                                    monitor(NewState#monstate{frun=false});
                                            false -> monitor(NewState#monstate{frun=false})
                                        end;
                                       

        {worker,{finished,_}}-> case queue:out(Q) of
                                    {{_,Element},Rest} -> W ! Element,
                                                    monitor(MState#monstate{free=false,queue=Rest,qc=C-1});
                                    {empty,Rest} -> monitor(MState#monstate{free=true,queue=Rest})
                                end;

        {'DOWN',Ref,process,_,_}->
             {NewWorkerPid,NewWorkerRef}=createProcess({?MODULE,worker,self()}),
             monitor(MState#monstate{wpid=NewWorkerPid,wref=NewWorkerRef,free=true});

        _->exit(invalid_message)

    end.

worker(MPid)->
    receive 
        {From,MSG} ->
            timer:sleep(?PROC_SLEEP),
            From ! {processed,MSG},
            MPid ! {worker,{finished,MSG}},
            worker(MPid);
        _ ->exit(bad_msg)
    end.

用法

2> A=mq:start().
<0.83.0>
3> A ! {self(),aa}.
{<0.76.0>,aa}
4> flush().
Shell got {processed,aa}
ok
5> A ! {self(),aa}.
{<0.76.0>,aa}
6> flush().
ok

我添加了一个跟踪器来查看发生了什么:

10> dbg:tracer().
{ok,<0.96.0>}
11> dbg:p(new,[sos,m]).
{ok,[{matched,nonode@nohost,0}]}

第一个运行:

14> A ! {self(),aa}.
(<0.100.0>) << {<0.76.0>,aa}     // message received my server 
(<0.100.0>) <0.101.0> ! {request,{<0.76.0>,aa}}   //message forwarded by server to monitor
{<0.76.0>,aa}
(<0.101.0>) << {request,{<0.76.0>,aa}}         
15> (<0.101.0>) <0.101.0> ! {worker,{finished,true}} //monitor starting the cycle
15> (<0.101.0>) << {worker,{finished,true}}  
15> (<0.101.0>) <0.102.0> ! {<0.76.0>,aa}  // monitor sending message to worker
15> (<0.102.0>) << {<0.76.0>,aa}
15> (<0.105.0>) <0.62.0> ! {io_request,<0.105.0>,
                           #Ref<0.3226054513.2760638467.167990>,
                           {get_until,unicode,
                               ["15",62,32],
                               erl_scan,tokens,
                               [1,[text]]}}
15> (<0.102.0>) << timeout                      //worker getting timeout ??
15> (<0.102.0>) <0.76.0> ! {processed,aa}    //worker sends to self() thje message
15> (<0.102.0>) <0.101.0> ! {worker,{finished,aa}}   //worker notifies monitor to update state
15> (<0.101.0>) << {worker,{finished,aa}}

第二个运行:

15> A ! {self(),aa}.
(<0.100.0>) << {<0.76.0>,aa}
(<0.100.0>) <0.101.0> ! {request,{<0.76.0>,aa}}   //monitor receiveing message
{<0.76.0>,aa}
(<0.101.0>) << {request,{<0.76.0>,aa}}
16> (<0.106.0>) <0.62.0> ! {io_request,<0.106.0>,
                           #Ref<0.3226054513.2760638467.168007>,
                           {get_until,unicode,
                               ["16",62,32],
                               erl_scan,tokens,
                               [1,[text]]}}

正如您从我的跟踪中看到的那样,在第一次通话中我不明白什么 happens.Does 我的 worker 超时了,如果是的话,为什么?

P.S frun 变量用作标志,仅在第一次 monitor 迭代时为真,因此当第一次项目到达过程将调用自身来处理它(将其发送给工人),因为工人是免费的。 在第一个 运行 之后,只要 worker 发出他空闲的信号,monitor 就会将项目从队列中取出。

更新

因此,在收到有用的评论后,我在 monitor 中稍微更改了我的逻辑,以便 worker 在第一个 运行 或完成后收到消息并通知 monitormonitor 的队列中仍有项目。 我还是做不到 work.Where 是僵局吗?

monitor(MState=#monstate{wpid=W,free=F,wref=Ref,queue=Q,qc=C,frun=FirstRun})->
        receive
            {request,{From ,Message}} -> case FirstRun of
                                            true ->  W ! {From,Message},
                                                     monitor(MState#monstate{frun=false,free=false});                                                     
                                            false -> 
                                                     St=case tryEnqueue({From,Message},MState) of 
                                                           {queue_full,S} -> From ! {queue_full,Message},
                                                                             S;
                                                           {queued,S} -> S
                                                        end,
                                                     monitor(St)
                                             end;
                                                                        
            {worker,{finished,_}}-> case queue:out(Q) of
                                        {{_,Element},Rest} -> W ! Element,
                                                        monitor(MState#monstate{free=false,queue=Rest,qc=C-1});
                                        {empty,Rest} -> monitor(MState#monstate{free=true,queue=Rest})
                                    end;

        end.

在你的代码中,似乎 frun 在第一个 运行:

之后总是假的
            case R of
                true -> self() ! {worker,{finished,R}},
                        monitor(NewState#monstate{frun=false});
                false -> monitor(NewState#monstate{frun=false})
            end;

一旦到达 false,将不会传送 {worker, {finished, R}} 消息,因此不会从队列中提取任何元素。

更新:死锁序列:

  1. Monitor 收到第一份工作并将其转发给 worker
  2. 监视器的frun现在是假的
  3. 工作人员执行工作
  4. 工作人员通知班长工作已完成
  5. 因为监视器的队列是空的,所以没有任何反应。
  6. Monitor 收到第二份作业,因为 frun 为 false,作业不会转发给 worker

monitor 行为需要依赖于 frun。它只需要取决于 worker 是否为 free。我更新了 monitor 函数以在以下代码中反映这一点。

-module(mq).
-compile(export_all).

-record(monstate,{
    queue,
    qc,
    wpid,
    free=true,
    wref,
    init=false
}).
-record(sstate,{
    init=false,
    mpid=null,
    mref=null
}).

-define(QUEUE_SIZE,5).
-define(PROC_SLEEP,2000).


createProcess({M,F,A})->
    Pid=spawn(M,F,[A]),
    Ref=erlang:monitor(process,Pid),
    {Pid,Ref}.

start()->
    spawn(?MODULE,server,[#sstate{init=false}]).

server(State=#sstate{init=I})when I=:=false ->
    {MPid,MRef}=createProcess({?MODULE,monitor,#monstate{init=false}}),
    server(State#sstate{init=true,mpid=MPid,mref=MRef});

server(State=#sstate{mpid=MPid,mref=MRef})->
    receive
           {From,state}->From ! State,
                            server(State);
           {From,Message}-> MPid ! {request,{From,Message}},
                            server(State);

            {'DOWN',MRef,process,MPid,_}-> {NewMPid,NewMRef}=createProcess({?MODULE,monitor,#monstate{init=false}}),
                                            server(State#sstate{mpid=NewMPid,mref=NewMRef});
            _ ->exit(invalid_message)

    end.


tryEnqueue(Message,MState=#monstate{queue=Q,qc=C}) when C<?QUEUE_SIZE->
    NewQueue=queue:in(Message,Q),
    {queued,MState#monstate{qc=C+1,queue=NewQueue}};
tryEnqueue(_,MState)->{queue_full,MState}.

monitor(MState=#monstate{wpid=_,wref=_,init=I}) when I=:= false ->
    {WorkerPid,WorkerRef}=createProcess({?MODULE,worker,self()}),
    monitor(MState#monstate{wpid=WorkerPid,wref=WorkerRef,init=true,qc=0,queue=queue:new()});

monitor(MState=#monstate{wpid=W,free=F,wref=Ref,queue=Q,qc=C})->
  receive
    {request,{From ,Message}} ->
      %% check whether worker is free or not
      case F of
        true ->
          W ! {From,Message},
          monitor(MState#monstate{free=false});

        false ->
          St=case tryEnqueue({From,Message},MState) of
               {queue_full,S} ->
                 From ! {queue_full,Message},
                 S;
               {queued,S} -> S
             end,
          monitor(St)
      end;

    {worker,{finished,_}} ->
      case queue:out(Q) of
        {{_,Element},Rest} ->
          W ! Element,
          monitor(MState#monstate{free=false,queue=Rest,qc=C-1});

        {empty,Rest} ->
          monitor(MState#monstate{free=true,queue=Rest})
      end;

    {'DOWN',Ref,process,_,_} ->
      {NewWorkerPid,NewWorkerRef}=createProcess({?MODULE,worker,self()}),
      monitor(MState#monstate{wpid=NewWorkerPid,wref=NewWorkerRef,free=true});

    _->exit(invalid_message)

  end.

worker(MPid)->
  receive
    {From,MSG} ->
      timer:sleep(?PROC_SLEEP),
      From ! {processed,MSG},
      MPid ! {worker,{finished,MSG}},
      worker(MPid);
    _ ->exit(bad_msg)
  end.

用法

Eshell V10.5  (abort with ^G)
1> c(mq).
mq.erl:2: Warning: export_all flag enabled - all functions will be exported
{ok,mq}
2> A=mq:start().
<0.92.0>
3> A ! {self(),aa}.
{<0.85.0>,aa}
4> flush().
Shell got {processed,aa}
ok
5> A ! {self(),aa}.
{<0.85.0>,aa}
6> flush().
Shell got {processed,aa}
ok
7> A ! {self(), aa}, A ! {self(), bb}.
{<0.85.0>,bb}
8> flush().                           
Shell got {processed,aa}
Shell got {processed,bb}
ok
9>