在 python 中合并多个阻塞生成器函数

Merging multiple blocking generator functions in python

我有两个迭代器。每个代表来自阻塞资源(如套接字)的可能无限的数据流。

我想合并两个迭代器中的数据,按照它到达的顺序——即非确定性的。更详细地说,如果我有迭代器 iter1iter2,我希望我的结果是等同于 merged.

的迭代器
iter1 : 1 2 3     4   5 ...
iter2 :       1 2   3   ... 
merged: 1 2 3 1 2 4 3 5 ...

   --- > increasing time ---> 

我想我需要一个并发程序,但我不确定是否有 pythonic 方法可以做到这一点。我强烈希望答案适用于 Python 2.6.

例如,假设我有两个迭代器,它们 "under the hood" 从套接字读取数据。这是一个快速服务器 "listener",它反复回显客户端连接的 date/time:

==> message.sh <==
#!/usr/bin/env bash
set -e;

# Repeatedly echo the date/time of client connection
MSG=$(date)
while true; do
  echo $MSG;
  sleep 1;
done

==> server.sh <==
#!/usr/bin/env bash
socat TCP-LISTEN:8008,reuseaddr,fork system:"./message.sh"

您可以 运行 服务器 ./server.sh

下面是一个示例 python 脚本,它试图合并来自两个套接字的消息。然而,这是不正确的——它必须从每个迭代器接收一个值才能继续。使用上面的示例,"merged" 结果将是:

iter1 : 1 2 3     4   5 ...
iter2 :       1 2   3   ... 
merged: 1     1 2 2 3 3 4     ...

这是脚本:

#!/usr/bin/env python2
import socket
import time

HOST = "127.0.0.1"
PORT = 8008


def iterate_socket(sock):
    while True:
        yield sock.recv(1024)


def merge(xs, ys):
    iters = [xs, ys]
    while True:
        for it in iters:
            try:
                i = it.next()
                yield i
            except StopIteration:
                pass

sock1 = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock1.connect((HOST, PORT))
time.sleep(1)
sock2 = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock2.connect((HOST, PORT))

iter1 = iterate_socket(sock1)
iter2 = iterate_socket(sock2)

for msg in merge(iter1, iter2):
    print msg,

最后:我从一个库中获取了迭代器,所以请假设为了这个问题的目的,我必须处理迭代器,并且我不能做一些事情,比如将套接字设置为非阻塞和轮询。

您可以将套接字迭代移动到后台线程,然后使用 Queue 将每个接收到的数据发送到您的主线程。然后你的主线程可以直接使用来自队列的数据:

import socket
import time
from Queue import Queue
from threading import Thread

HOST = "127.0.0.1"
PORT = 8008


def iterate_socket(sock):
    while True:
        data = sock.recv(1024)
        yield data
        if not data: # End of the stream
            return

def consume(q, s):
    for i in s:
        q.put(i)

def merge(xs, ys):
    q = Queue()
    iters = [xs, ys]
    for it in iters:
        t = Thread(target=consume, args=(q, it))
        t.start()

    done = 0
    while True:
        out = q.get()
        if out == '':  # End of the stream.
            done += 1
            if done == len(iters): # When all iters are done, break out.
                return
        else:
            yield out

sock1 = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock1.connect((HOST, PORT))
time.sleep(1)
sock2 = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock2.connect((HOST, PORT))

iter1 = iterate_socket(sock1)
iter2 = iterate_socket(sock2)

for msg in merge(iter1, iter2):
    print msg,