如何在 Python 中的多个并行进程中使用套接字处的传入数据流?

How to use incoming data stream at a socket for multiple parallel processes in Python?

import socket 

ip_addr = '100.100.1.1'

port_num = 5000

socket_obj = socket.socket(family=socket.AF_INET, type=socket.SOCK_DGRAM)
socket_obj.bind((ip_addr, port_num))
socket_obj.settimeout(2)

我已经创建了这个套接字对象,其中我有一个特定速率的传入数据流。我想为两个并行进程 运行 实时使用该数据。

def process1():
    while True:
        Try:
            new_data = socket_obj.recvfrom()
            some_process(new_data)
        
        except socket.timeout:
            break

def process2():
    while True:
        Try:
            new_data = socket_obj.recvfrom()
            some_other_process(new_data)
        
        except socket.timeout:
            break

运行 这两个进程中的任何一个都可以完美地工作,但是我如何确保我可以并行地拥有两个进程 运行,它们从同一个套接字读取而没有任何明显的延迟或者在两个流中的任何一个中丢失数据?

传入数据的性质非常确定。刚好 50 字节的数据以每秒 1000 次的速率传入。我已将超时设置为 2 秒,以便一旦套接字在 2 秒内未收到任何数据,进程就会结束。

此外,每个进程都需要访问到达套接字的每个数据包。

我会通过创建两个 Process 实例来解决这个问题,其中每个实例都传递自己的 multiprocessing.Queue 实例,从套接字读取的主进程将读取的消息放在每个进程的队列进行处理。写入和读取这些队列会产生一些开销,这可能会稍微降低最大处理速率,因此现在是否可以处理是否跟上传入数据就成了一个问题。但是您显然不能让每个进程并行读取套接字。请参阅我在桌面上执行的以下仿真,其中描述了问题。

import socket
import multiprocessing

def some_process(q):
    while True:
        data = q.get()
        if data is None:
            break
        # process data:
        ...

def some_other_process(q):
    while True:
        data = q.get()
        if data is None:
            break
        # process data:
        ...

def main():
    ip_addr = '100.100.1.1'
    port_num = 5000

    socket_obj = socket.socket(family=socket.AF_INET, type=socket.SOCK_DGRAM)
    socket_obj.bind((ip_addr, port_num))
    socket_obj.settimeout(2)

    q1 = multiprocessing.Queue()
    p1 = multiprocessing.Process(target=some_process, args=(q1,))
    q2 = multiprocessing.Queue()
    p2 = multiprocessing.Process(target=some_other_process, args=(q2,))
    p1.start()
    p2.start()
    while True:
        try:
            new_data = socket_obj.recvfrom()
        except socket.timeout:
            break
        else:
            q1.put(new_data)
            q2.put(new_data)
    # wait for outstanding tasks to complete:
    q1.put(None)
    q2.put(None)
    p1.join()
    p2.join()

# Required if running under Windows:
if __name__ == '__main__':
    main()

我的桌面上的仿真

我 运行 在我不太快的桌面上进行以下仿真,以查看由于将这些 50 字节数据项写入和读取到多处理的开销,我可以用简单的处理函数维持多少速率队列:

import multiprocessing

def some_process(q):
    while True:
        data = q.get()
        if data is None:
            break
        # process data:
        ...

def some_other_process(q):
    while True:
        data = q.get()
        if data is None:
            break
        # process data:
        ...

def main():
    import time
    q1 = multiprocessing.Queue()
    p1 = multiprocessing.Process(target=some_process, args=(q1,))
    q2 = multiprocessing.Queue()
    p2 = multiprocessing.Process(target=some_other_process, args=(q2,))
    p1.start()
    p2.start()
    t1 = time.time()
    for new_data in range(10_000):
        # Next put will be in .001 seconds for a hoped-for rate of 1000/sec.
        expiration = time.time() + .001
        q1.put('xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx')
        q2.put('xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx')
        diff = expiration - time.time()
        if diff > 0:
            time.sleep(diff)
    # wait for outstanding tasks to complete:
    q1.put(None)
    q2.put(None)
    rate = 10_000 / (time.time() - t1)
    print('Done:', rate)
    p1.join()
    p2.join()

# Required if running under Windows:
if __name__ == '__main__':
    main()

打印:

Done: 614.8320395921962

我只能维持 615 的速率 messages/second。如果写入队列的速度快于消息的处理速度,内存就会耗尽。这可不是什么好事。

更新

上面的模拟对我来说似乎有些可疑。我在以下基准测试中确定我可以以极高的速率 (208,317 messages/sec.) 写入队列,并且可以以很高的速率 (23,094 messages/sec.)。我必须得出结论,由于 time.sleep 函数相当不精确,我之前的仿真不准确。

import multiprocessing

def some_process(q):
    while True:
        data = q.get()
        if data is None:
            break
        # process data:
        ...

def some_other_process(q):
    while True:
        data = q.get()
        if data is None:
            break
        # process data:
        ...

def main():
    import time

    q1 = multiprocessing.Queue()
    p1 = multiprocessing.Process(target=some_process, args=(q1,))
    q2 = multiprocessing.Queue()
    p2 = multiprocessing.Process(target=some_other_process, args=(q2,))
    p1.start()
    p2.start()
    t1 = time.time()
    for _ in range(10_000):
        # Next put will be in .001 seconds for a hoped-for rate of 1000/sec.
        q1.put('xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx')
        q2.put('xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx')
    # wait for outstanding tasks to complete:
    q1.put(None)
    q2.put(None)
    rate = 10_000 / (time.time() - t1)
    print('Done. Put Rate:', rate)
    p1.join()
    p2.join()
    rate = 10_000 / (time.time() - t1)
    print('Done. Processing Rate:', rate)


# Required if running under Windows:
if __name__ == '__main__':
    main()

打印:

Done. Put Rate: 208317.3903110131
Done. Processing Rate: 23094.772557205524