AWS Batch 上的进程间通信

Interprocess communication on AWS Batch

是否可以使用 ZeroMQ 在 AWS Batch 上的作业与本地父进程之间进行进程间通信?如果是这样,我可以参考一个简单的例子吗?我找不到这方面的文档。

我希望 clustermq R 包有一天会支持 AWS Batch 后端:https://github.com/mschubert/clustermq/issues/208clustermq 已经可以向 SLURM 等传统作业调度程序上的工作人员发送 R 函数调用,并使用 ZeroMQ 与工作人员进行通信。

Q : "Is it possible to use ZeroMQ for interprocess communication between a job on AWS Batch and a local parent process?"

ZeroMQ 是一个可行的工具箱,所以唯一的问题是云所有者指定的策略(加上修饰因素瓶颈,因为这些一直是新的奇点相关问题,SPoF如果有人想要...)

分布式计算本身与 ZeroMQ 没有其他问题。


PoC 示例

我们可以在所有远程节点上实例化此代码并让它们 运行 并与您的本地主机通信,确保是否被云所有者的一组策略阻止并且您是否可以让接收方在云引发的海啸 ;o)

已设置连接,以便远程发送者代码“知道”本地主机的 public IP 地址,或类似动机的 SSH 隧道元平面,通过 SSH 端口转发隧道调解互连,从而允许所有云远程发送者(如果没有被云所有者阻止...)实际上到达本地主机私有 IP 地址,就好像它们在同一个私有 LAN 网段中一样。

(为便于交叉移植而提供的 Python 代码)


localhost-receiver 模型示例:

import zmq;         nIOthreads = 4 ### traffic dependent, given an expected ingress Tsunami
aCtx = zmq.Context( nIOthreads )
aSub = aCtx.socket( zmq.XSUB )
aSub.bind( "tcp://<_aPublicIpADDRESS_or_SSL_tunnelled_privateIpADDRESS_>:<_port#_>" )
aSub.setsockopt( zmq.LINGER, 0 )
aSub.setsockopt( zmq.CONFLATE, 0 )
aSub.setsockopt( zmq.SUBSCRIBE, "" )
...
while True:
      try:
           ret = aSub.recv( zmq.NOBLOCK )
           if ret:
                 print( "GOT: {0:}".format( repr( ret ) ) )

      except:
           print( "EXC: will terminate" )
           break

      finally:
           aSub.close()
           aCtx.term()

print( "FIN:  did terminate and process will sysexit." )

云远程发件人模型示例:

import socket
import zmq;         nIOthreads = 1 ### beware of the grooming-factor effects on traffic
aCtx = zmq.Context( nIOthreads )
aPub = aCtx.socket( zmq.XPUB )
aPub.connect( "tcp://<_aPublicIpADDRESS_or_SSL_tunnel_to_privateIpADDRESS_>:<_port#_>" )
aPub.setsockopt( zmq.LINGER, 0 )
aPub.setsockopt( zmq.CONFLATE, 0 )
...
aCloudHostMsgN = 1
aCloudHostNAME = socket.gethostname()
aCloudHostIP   = socket.gethostbyname( aCloudHostNAME )
aMsgMASK       = ( "msg[{0:_>9d}]"
                 + "_from_{0:_>20s}_ip_{1:_>20s}".format( aCloudHotsNAME,
                                                          aCloudHostIP
                                                          )
                   )
while True:
      try:
           aPub.send( aMsgMASK.format( aCloudHostMsgN ) )
           pass;                       aCloudHostMsgN++

      except:
           print( "EXC: will terminate" )
           break

      finally:
           aPub.close()
           aCtx.term()

print( "FIN:  did terminate and process will sysexit." )