PYTHON: 如何从使用 zmq Poller 的服务器连续接收数据?
PYTHON: How to receive data continuously from a server that's using zmq Poller?
我正在使用简单的 requests.post() 模块连接到服务器并接收数据
ack = requests.post('<ip>/get_data, data=data, timeout=10.0, verify=False)
下面是服务器中的一个方法 get_data(),它使用 zmq poller 接收数据,直到获得最终响应。
def get_data():
req = json.loads(self.params.get('data'))
wire = wiring.Wire("indexing_data_pool", zmq_context=g.zmq_context)
try:
close_immediately = True
poll_agent = zmq.Poller()
poll_agent.register(wire.socket, zmq.POLLIN)
wire.send(req)
iteration = 1
while True:
socks = dict(poll_agent.poll())
if wire.socket in socks and socks[wire.socket] == zmq.POLLIN:
res = gevent.with_timeout(10, wire.recv, timeout_value=None)
if res.get('final'):
log.warn('Last Iteration: %s, Length of Rows: %s' %(iteration, len(res['rows'])))
break
else:
log.warn('Iteration: %s, Length of Rows: %s' %(iteration, len(res['rows'])))
next_req = {'search_id': req['search_id'], 'seen_version':res.get('response_version')}
wire.send(next_req)
iteration +=1
finally:
wire.close(immediate=close_immediately)
poll_agent.unregister(wire.socket)
在服务器端,获得的响应记录如下:
2017-08-28_07:17:55.43370 WARNING: Iteration: 1, Length of Rows: 100
2017-08-28_07:17:55.44269 WARNING: Iteration: 2, Length of Rows: 100
2017-08-28_07:17:55.44894 WARNING: Iteration: 3, Length of Rows: 100
2017-08-28_07:17:55.45742 WARNING: Iteration: 4, Length of Rows: 100
2017-08-28_07:17:55.46327 WARNING: Iteration: 5, Length of Rows: 100
2017-08-28_07:17:55.46687 WARNING: Iteration: 6, Length of Rows: 100
2017-08-28_07:17:55.47074 WARNING: Iteration: 7, Length of Rows: 100
2017-08-28_07:17:55.47658 WARNING: Iteration: 8, Length of Rows: 100
2017-08-28_07:17:55.48385 WARNING: Last Iteration: 9, Length of Rows: 75
所以,我假设我在服务器端实现的 zmq 轮询器工作得很好。但是,我很想知道如何将这 9 次数据迭代发送回请求客户端?
P.S. I want to receive the data continuously in client side. You may
suggest appending each batch of response somewhere and sending the
final response back to the client. This won't be feasible when the
response is too big (The requesting client would get timeout)
好的,
我使用 Flask 模块中的 Response 对象处理了这个问题。调用一个单独的方法来连续生成数据并生成最终的 Response 对象并返回它解决了我的问题。
我正在使用简单的 requests.post() 模块连接到服务器并接收数据
ack = requests.post('<ip>/get_data, data=data, timeout=10.0, verify=False)
下面是服务器中的一个方法 get_data(),它使用 zmq poller 接收数据,直到获得最终响应。
def get_data():
req = json.loads(self.params.get('data'))
wire = wiring.Wire("indexing_data_pool", zmq_context=g.zmq_context)
try:
close_immediately = True
poll_agent = zmq.Poller()
poll_agent.register(wire.socket, zmq.POLLIN)
wire.send(req)
iteration = 1
while True:
socks = dict(poll_agent.poll())
if wire.socket in socks and socks[wire.socket] == zmq.POLLIN:
res = gevent.with_timeout(10, wire.recv, timeout_value=None)
if res.get('final'):
log.warn('Last Iteration: %s, Length of Rows: %s' %(iteration, len(res['rows'])))
break
else:
log.warn('Iteration: %s, Length of Rows: %s' %(iteration, len(res['rows'])))
next_req = {'search_id': req['search_id'], 'seen_version':res.get('response_version')}
wire.send(next_req)
iteration +=1
finally:
wire.close(immediate=close_immediately)
poll_agent.unregister(wire.socket)
在服务器端,获得的响应记录如下:
2017-08-28_07:17:55.43370 WARNING: Iteration: 1, Length of Rows: 100
2017-08-28_07:17:55.44269 WARNING: Iteration: 2, Length of Rows: 100
2017-08-28_07:17:55.44894 WARNING: Iteration: 3, Length of Rows: 100
2017-08-28_07:17:55.45742 WARNING: Iteration: 4, Length of Rows: 100
2017-08-28_07:17:55.46327 WARNING: Iteration: 5, Length of Rows: 100
2017-08-28_07:17:55.46687 WARNING: Iteration: 6, Length of Rows: 100
2017-08-28_07:17:55.47074 WARNING: Iteration: 7, Length of Rows: 100
2017-08-28_07:17:55.47658 WARNING: Iteration: 8, Length of Rows: 100
2017-08-28_07:17:55.48385 WARNING: Last Iteration: 9, Length of Rows: 75
所以,我假设我在服务器端实现的 zmq 轮询器工作得很好。但是,我很想知道如何将这 9 次数据迭代发送回请求客户端?
P.S. I want to receive the data continuously in client side. You may suggest appending each batch of response somewhere and sending the final response back to the client. This won't be feasible when the response is too big (The requesting client would get timeout)
好的, 我使用 Flask 模块中的 Response 对象处理了这个问题。调用一个单独的方法来连续生成数据并生成最终的 Response 对象并返回它解决了我的问题。