在 python 中合并多个阻塞生成器函数
Merging multiple blocking generator functions in python
我有两个迭代器。每个代表来自阻塞资源(如套接字)的可能无限的数据流。
我想合并两个迭代器中的数据,按照它到达的顺序——即非确定性的。更详细地说,如果我有迭代器 iter1
和 iter2
,我希望我的结果是等同于 merged
.
的迭代器
iter1 : 1 2 3 4 5 ...
iter2 : 1 2 3 ...
merged: 1 2 3 1 2 4 3 5 ...
--- > increasing time --->
我想我需要一个并发程序,但我不确定是否有 pythonic 方法可以做到这一点。我强烈希望答案适用于 Python 2.6.
例如,假设我有两个迭代器,它们 "under the hood" 从套接字读取数据。这是一个快速服务器 "listener",它反复回显客户端连接的 date/time:
==> message.sh <==
#!/usr/bin/env bash
set -e;
# Repeatedly echo the date/time of client connection
MSG=$(date)
while true; do
echo $MSG;
sleep 1;
done
==> server.sh <==
#!/usr/bin/env bash
socat TCP-LISTEN:8008,reuseaddr,fork system:"./message.sh"
您可以 运行 服务器 ./server.sh
。
下面是一个示例 python 脚本,它试图合并来自两个套接字的消息。然而,这是不正确的——它必须从每个迭代器接收一个值才能继续。使用上面的示例,"merged" 结果将是:
iter1 : 1 2 3 4 5 ...
iter2 : 1 2 3 ...
merged: 1 1 2 2 3 3 4 ...
这是脚本:
#!/usr/bin/env python2
import socket
import time
HOST = "127.0.0.1"
PORT = 8008
def iterate_socket(sock):
while True:
yield sock.recv(1024)
def merge(xs, ys):
iters = [xs, ys]
while True:
for it in iters:
try:
i = it.next()
yield i
except StopIteration:
pass
sock1 = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock1.connect((HOST, PORT))
time.sleep(1)
sock2 = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock2.connect((HOST, PORT))
iter1 = iterate_socket(sock1)
iter2 = iterate_socket(sock2)
for msg in merge(iter1, iter2):
print msg,
最后:我从一个库中获取了迭代器,所以请假设为了这个问题的目的,我必须处理迭代器,并且我不能做一些事情,比如将套接字设置为非阻塞和轮询。
您可以将套接字迭代移动到后台线程,然后使用 Queue
将每个接收到的数据发送到您的主线程。然后你的主线程可以直接使用来自队列的数据:
import socket
import time
from Queue import Queue
from threading import Thread
HOST = "127.0.0.1"
PORT = 8008
def iterate_socket(sock):
while True:
data = sock.recv(1024)
yield data
if not data: # End of the stream
return
def consume(q, s):
for i in s:
q.put(i)
def merge(xs, ys):
q = Queue()
iters = [xs, ys]
for it in iters:
t = Thread(target=consume, args=(q, it))
t.start()
done = 0
while True:
out = q.get()
if out == '': # End of the stream.
done += 1
if done == len(iters): # When all iters are done, break out.
return
else:
yield out
sock1 = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock1.connect((HOST, PORT))
time.sleep(1)
sock2 = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock2.connect((HOST, PORT))
iter1 = iterate_socket(sock1)
iter2 = iterate_socket(sock2)
for msg in merge(iter1, iter2):
print msg,
我有两个迭代器。每个代表来自阻塞资源(如套接字)的可能无限的数据流。
我想合并两个迭代器中的数据,按照它到达的顺序——即非确定性的。更详细地说,如果我有迭代器 iter1
和 iter2
,我希望我的结果是等同于 merged
.
iter1 : 1 2 3 4 5 ...
iter2 : 1 2 3 ...
merged: 1 2 3 1 2 4 3 5 ...
--- > increasing time --->
我想我需要一个并发程序,但我不确定是否有 pythonic 方法可以做到这一点。我强烈希望答案适用于 Python 2.6.
例如,假设我有两个迭代器,它们 "under the hood" 从套接字读取数据。这是一个快速服务器 "listener",它反复回显客户端连接的 date/time:
==> message.sh <==
#!/usr/bin/env bash
set -e;
# Repeatedly echo the date/time of client connection
MSG=$(date)
while true; do
echo $MSG;
sleep 1;
done
==> server.sh <==
#!/usr/bin/env bash
socat TCP-LISTEN:8008,reuseaddr,fork system:"./message.sh"
您可以 运行 服务器 ./server.sh
。
下面是一个示例 python 脚本,它试图合并来自两个套接字的消息。然而,这是不正确的——它必须从每个迭代器接收一个值才能继续。使用上面的示例,"merged" 结果将是:
iter1 : 1 2 3 4 5 ...
iter2 : 1 2 3 ...
merged: 1 1 2 2 3 3 4 ...
这是脚本:
#!/usr/bin/env python2
import socket
import time
HOST = "127.0.0.1"
PORT = 8008
def iterate_socket(sock):
while True:
yield sock.recv(1024)
def merge(xs, ys):
iters = [xs, ys]
while True:
for it in iters:
try:
i = it.next()
yield i
except StopIteration:
pass
sock1 = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock1.connect((HOST, PORT))
time.sleep(1)
sock2 = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock2.connect((HOST, PORT))
iter1 = iterate_socket(sock1)
iter2 = iterate_socket(sock2)
for msg in merge(iter1, iter2):
print msg,
最后:我从一个库中获取了迭代器,所以请假设为了这个问题的目的,我必须处理迭代器,并且我不能做一些事情,比如将套接字设置为非阻塞和轮询。
您可以将套接字迭代移动到后台线程,然后使用 Queue
将每个接收到的数据发送到您的主线程。然后你的主线程可以直接使用来自队列的数据:
import socket
import time
from Queue import Queue
from threading import Thread
HOST = "127.0.0.1"
PORT = 8008
def iterate_socket(sock):
while True:
data = sock.recv(1024)
yield data
if not data: # End of the stream
return
def consume(q, s):
for i in s:
q.put(i)
def merge(xs, ys):
q = Queue()
iters = [xs, ys]
for it in iters:
t = Thread(target=consume, args=(q, it))
t.start()
done = 0
while True:
out = q.get()
if out == '': # End of the stream.
done += 1
if done == len(iters): # When all iters are done, break out.
return
else:
yield out
sock1 = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock1.connect((HOST, PORT))
time.sleep(1)
sock2 = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock2.connect((HOST, PORT))
iter1 = iterate_socket(sock1)
iter2 = iterate_socket(sock2)
for msg in merge(iter1, iter2):
print msg,