为什么线程只以大间隔同步?

Why are threads synchronizing only with large intervals?

FPC 3.0.0,拉撒路 1.6.0

我在一台Linux机器上有两个线程(待会儿再去查看Windows,这段代码应该是跨平台的),主线程和客户端线程。客户端线程的目的是提供非阻塞请求-回复网络功能。

pascal客户端(请求者)与服务器通信,写在python(回复者)中。

我正在使用 ZeroMQ REQ/REP 到低级的东西,最后的版本。

在高层,创建和启动客户端线程是这样的:

//from the main thread
FClientThread.Create('tcp://localhost:5020');
FClientThread.Start;

发送请求:

//from the main thread
FClientThread.SendRequest('im_requesting_stuff',-1,'give_it_to_me');
Sleep(1000)
FClientThread.SendRequest('requesting',-1,'gimme_more'); 

//the TClientThread descends from TThread.
procedure TClientThread.SendRequest(ACode: string; ATrialIndex: integer;
  ARequest: string);
begin
  FRequest := ARequest;
  FCode := ACode;
  FTrialIndex := IntToStr(ATrialIndex);
  RTLeventSetEvent(FRTLEvent);
end;

procedure TClientThread.Execute;
var
  AMessage : UTF8String;
begin
  while not Terminated do
    begin
      AMessage := '';
      RTLeventWaitFor(FRTLEvent);

      FRequester.send( FRequest );
      FRequester.recv( AMessage ); // blocking code,
                                   // that is why it is inside the thread

      // *****************************************************************************
      // BUG: These vars are being filled with the last values of
      //            ( 'FTrialIndex',               'AMessage',               'FCode' )
      FMsg := #40#39 + FTrialIndex + #39#44#32#39 + AMessage + #39#44#32#39 + FCode + #39#41;
      // 
      // *****************************************************************************

      Synchronize( @Showstatus );
    end;
end;  

服务器端以非阻塞方式工作,单线程:

while True:
   msg = self.socket.recv(flags=zmq.NOBLOCK)
   # do stuff
   self.socket.send(response)

只有在非常短的时间间隔内多次调用 .SendRequest()(例如没有 sleep())时,才会出现评论的错误(上次调用的重复)。

我预计会有一些损失,但我没想到会重复,因为我明确地等待主线程 return。

有什么建议吗?

此致


编辑 1:

ZMQ 版本

python

# zmq
print zmq.zmq_version()
# 4.1.2

# pyzmq
print zmq.__version__
# 15.1.0

帕斯卡

对于 delphizmq 版本,请参阅提交 (50a28b4b72c531536452ee5d79e19f5960c9f7c7)。

// zmq
ZMQVersion(minor,major,patch)
WriteLn(minor,major,patch)
// 3.2.5

编辑 2:

日志

图例:

LogTickCount : ('REQTickCount', 'TestCode:DeltaTime')

在将 F 变量复制到局部变量之前:

4476.028775652 : [debug] TClockThread.Execute:Start 140736007759616
4476.087363892 : ('4476.08701411', 'S:7.316086894')
4478.567764596 : ('4478.56744415', '*R:9.828251263')
4478.923743380 : ('4478.92343718', 'R:10.192552661')
4479.134910639 : [debug] C: expected next
4479.166311115 : ('4479.16600296', 'R:10.424417498') // BUG
4479.201287606 : ('4479.20099538', 'R:10.424417498') 
4483.037278107 : ('4483.03664644', '2b:14.322538449')
4483.868644929 : ('4483.86663063', '*R:15.153748820')
4484.278650579 : ('4484.27829493', 'R:15.562894551')
4484.552199446 : [debug] C: expected next
// data lost
4484.594517381 : ('4484.59364657', 'R:15.841710775')
4490.088956444 : ('4490.08862455', '1b:21.318677098')
4490.465529156 : ('4490.46522659', '*R:21.738692793')
4490.744151772 : ('4490.74376093', 'R:22.027325175')
4491.047253601 : [debug] C: expected next
4491.064188821 : ('4491.0638315', 'R:22.336764091') // BUG
4491.100142009 : ('4491.09978804', 'R:22.336764091')
4496.084726117 : ('4496.08441014', '2a:27.320218402')
4496.494107080 : ('4496.49378719', '*R:27.762184396')
4496.816623032 : ('4496.81630259', 'R:28.100685410')
4497.088603481 : [debug] C: expected next
// data lost
4497.132445971 : ('4497.13214788', 'R:28.378107825')

将 Fvariables 复制到局部变量后(注意:数据没有丢失纯属幸运):

8183.233467050 : [debug] TClockThread.Execute:Start 140736477513472
8183.299773049 : ('8183.29946881', 'S:10.678368191')
8183.751976738 : ('8183.7516616', '*R:11.182574383')
8184.262027198 : ('8184.26170452', 'R:11.690292332')
8184.501553063 : [debug] C: is expected next
8184.517362396 : ('8184.51705341', 'C:11.952652441')
8184.552388891 : ('8184.55206258', 'R:11.950063743')
8189.291033037 : ('8189.29070225', '2b:16.683417064')
8189.880999859 : ('8189.88067746', '*R:17.299351333')
8190.257476710 : ('8190.25714788', 'R:17.699402216')
8190.515557895 : [debug] C: is expected next
8190.553234830 : ('8190.55290992', 'C:17.966527084')
8190.588187291 : ('8190.5878607', 'R:17.964049297')
8196.243242766 : ('8196.24264239', '1b:23.683231358')
8196.758162472 : ('8196.75784342', '*R:24.200630247')
8196.964202600 : ('8196.96383131', 'R:24.397590095')
8197.119091206 : [debug] C: is expected next
8197.142731450 : ('8197.14240346', 'C:24.570427216')
8197.177078782 : ('8197.17673878', 'R:24.567590990')
8208.243614030 : ('8208.24296169', '2a:35.685189479')
8208.659747197 : ('8208.65940133', '*R:36.092803071')
8208.794614317 : ('8208.79431818', 'R:36.230488258')
8208.909275842 : [debug] C: is expected next
8208.935924407 : ('8208.93559782', 'C:36.360198934')
8208.970681025 : ('8208.9703712', 'R:36.357781493')

事实证明,使用 关键部分 对于我的特定实现来说不是必需的。


编辑 3.:

请注意,目前的问题不是关于数据丢失(排队请求)的问题。它是关于线程同步(变量共享)的。关于数据丢失,使用 Sleep() 只是一个 hack(你应该使用队列)。

我用 Sleep(10) 测试了代码,它减少了数据丢失的机会。但您应该注意,这是一个基于幸运的 hack。

可能是行程安排的问题。您使用 FClientThread.Create('tcp://localhost:5020'); 创建了一个可执行代码(线程)并告诉了 O.S。 "thread is ready, you can schedule it whenever it's time" 与 FClientThread.Start;

使用 FClientThread.SendRequest('im_requesting_stuff',-1,'give_it_to_me'); 发送参数并不能保证您的线程会立即 运行。当前上下文将与主线程一起使用,而调度程序还不能启动您新创建的线程。 Sleep(1000) 在调度方面变化不大。如果您的代码适用于 Sleep(1000) 那么请相信我,这纯属运气;为什么?您无法确定调度程序何时调度您的线程,这就是原因。因此,当您调用 FClientThread.SendRequest('requesting',-1,'gimme_more'); 时,您仍然在主线程中并设置客户端线程的变量。每当您的客户端线程被调度时,它都会使用其当前变量,在本例中设置了两次。

阻塞参数传递机制应该在SendRequest方法中实现,我想。并确保有很多方法可以实现。

长话短说,都是关于线程同步和调度的。

PS: Sleep(0) 会绕过问题,但不会完全解决问题。同步问题仍然存在,但第二个线程可以在给定的量子时间内获得调度并完成 I/O 的机会。

PS2: Sleep(0) : "嘿,O.S。我说完了,你不用等到你给我分配的全部时间来打扰我。这里你可以利用我剩下的任何时间,我刚刚完成它。你可以安排其他任何人(如果没有其他人,你可以再次安排我),如果你愿意的话。Sleep Function MSDN

和引文:

After the sleep interval has passed, the thread is ready to run. If you specify 0 milliseconds, the thread will relinquish the remainder of its time slice but remain ready. Note that a ready thread is not guaranteed to run immediately. Consequently, the thread may not run until some time after the sleep interval elapses. For more information, see Scheduling Priorities.

您需要一些锁定机制来读取和写入来自不同线程的值。

要只有很少的锁定时间,请复制数据。

TClientThread = class( TThread )
private
  FCriticalSection: TCriticalSection;
  ...
end;

//the TClientThread descends from TThread.
procedure TClientThread.SendRequest(ACode: string; ATrialIndex: integer;
  ARequest: string);
begin
  FCriticalSection.Enter;
  try
    FRequest := ARequest;
    FCode := ACode;
    FTrialIndex := IntToStr(ATrialIndex);
  finally
    FCriticalSection.Leave;
  end;
  RTLeventSetEvent(FRTLEvent);
end;

procedure TClientThread.Execute;
var
  lRequest: string;
  AMessage : UTF8String;
begin
  while not Terminated do
    begin
      AMessage := '';
      RTLeventWaitFor(FRTLEvent);

      FCriticalSection.Enter;
      try
        // copy data to local vars
        lRequest := FRequest;
      finally
        FCriticalSection.Leave;
      end;

      FRequester.send( lRequest );
      ...