并行(代理)请求并取得最快的结果
Parallel (proxy) request and take the fastest result
我正在尝试通过外部代理(旋转器)优化请求。有时反应很快,有时很慢。所以我们的想法是并行发送同一个 url 请求的多个请求,获取最快的响应,return 数据,关闭函数而不等待其他较慢的响应。
python网上有很多关于并行请求的教程和SO问题,但都是针对不同请求的并行请求,而不是重复请求。此外,代码会等待所有请求完成。一旦最快的响应得到响应,我想终止并行请求逻辑(最好以一种干净的方式)。
我的应用 运行 在 Python Flask 和 运行 中使用 Gunicorn + Eventlet。我尝试了 Eventlet green pools 和 Python Concurrent Futures,但是使用 Eventlet Greenpool 似乎更合适,因为代码将 运行 在 Gunicorn + Eventlet workers 和 Celery with Eventlet workers 中。
我目前正在使用 Luminati 代理管理器 (LPM) 重试失败的请求。一个老版本似乎支持盒子里的并行请求,但是现在的版本已经不支持这个功能了。所以我要么尝试用我的 Python 应用程序中的代码解决它,要么添加另一个服务/工具(如 LPM)来处理并行请求并选择最快的一个。
代理服务Luminati.io提供了一个'high performance parallel request'代码示例(基于Eventlet Greenpool)。参见 'original example'
我编辑了没有代理和登录的代码,以使其更具可重复性并避免不可预测的代理响应时间。我没有得到 Luminati 的任何支持,所以我正在尝试在 SO 上解决这个问题。
对于此测试,我使用模拟的 5 秒慢响应,以及来自 httpstat.us:
的快速响应
['http://httpstat.us/200?sleep=5000','http://httpstat.us/200']
在编辑后的代码中,我添加了带计时的打印语句,以查看哪个响应先返回。
这段代码有两个问题。有时我可以看到快速响应首先返回并打印响应数据 ('OK'),然后 5 秒后显示慢速响应。但是,通常看起来代码会等到两个响应都返回(两个时间完全相同)。
另一个问题是,虽然我能够立即打印并查看 'fast' 响应的数据,但逻辑仍然会等待所有响应完成。我想 return 数据并在第一个响应返回后关闭该功能。在我编辑的代码中,你可以看到一些代码(注释掉的行)是我试图不成功地终止进程(然而这只是重新启动 eventlet 进程)。
原始示例
import eventlet
from eventlet.green.urllib import request
import random
import socket
super_proxy = socket.gethostbyname('zproxy.lum-superproxy.io')
class SingleSessionRetriever:
url = "http://%s-session-%s:%s@"+super_proxy+":%d"
port = 22225
def __init__(self, username, password, requests_limit, failures_limit):
self._username = username
self._password = password
self._requests_limit = requests_limit
self._failures_limit = failures_limit
self._reset_session()
def _reset_session(self):
session_id = random.random()
proxy = SingleSessionRetriever.url % (self._username, session_id, self._password,
SingleSessionRetriever.port)
proxy_handler = request.ProxyHandler({'http': proxy, 'https': proxy})
self._opener = request.build_opener(proxy_handler)
self._requests = 0
self._failures = 0
def retrieve(self, url, timeout):
while True:
if self._requests == self._requests_limit:
self._reset_session()
self._requests += 1
try:
timer = eventlet.Timeout(timeout)
result = self._opener.open(url).read()
timer.cancel()
return result
except:
timer.cancel()
self._failures += 1
if self._failures == self._failures_limit:
self._reset_session()
class MultiSessionRetriever:
def __init__(self, username, password, session_requests_limit, session_failures_limit):
self._username = username
self._password = password
self._sessions_stack = []
self._session_requests_limit = session_requests_limit
self._session_failures_limit = session_failures_limit
def retrieve(self, urls, timeout, parallel_sessions_limit, callback):
pool = eventlet.GreenPool(parallel_sessions_limit)
for url, body in pool.imap(lambda url: self._retrieve_single(url, timeout), urls):
callback(url, body)
def _retrieve_single(self, url, timeout):
if self._sessions_stack:
session = self._sessions_stack.pop()
else:
session = SingleSessionRetriever(self._username, self._password,
self._session_requests_limit, self._session_failures_limit)
body = session.retrieve(url, timeout)
self._sessions_stack.append(session)
return url, body
def output(url, body):
print(body)
n_total_req = 100
req_timeout = 10
n_parallel_exit_nodes = 10
switch_ip_every_n_req = 10
max_failures = 2
MultiSessionRetriever('lum-customer-c_ba028d72-zone-static', 'akssw3iy6h3y', switch_ip_every_n_req, max_failures).retrieve(
["http://lumtest.com/myip.json"] * n_total_req, req_timeout, n_parallel_exit_nodes, output)
编辑代码(没有登录和代理)
def high_perf_parallel_requests(search_url):
try:
import datetime
from eventlet.green.urllib import request
results2 = []
results1 = []
class SingleSessionRetriever:
def __init__(self, username, password, requests_limit, failures_limit):
self._username = username
self._password = password
self._requests_limit = requests_limit
self._failures_limit = failures_limit
self._reset_session()
def _reset_session(self):
self._requests = 0
self._failures = 0
def retrieve(self, url, timeout):
print("\n SingleSessionRetriever.retrieve init")
print(url)
print(datetime.datetime.now())
while True:
if self._requests == self._requests_limit:
self._reset_session()
self._requests += 1
try:
timer = eventlet.Timeout(timeout)
result = request.urlopen(url).read()
print("\n SingleSessionRetriever.retrieve result")
print(url)
print(result)
print(datetime.datetime.now())
results1.append(result)
timer.cancel()
# eventlet.kill(pool)
# raise Exception("Got fastest result. Kill eventlet")
#eventlet.kill(self)
#pool.kill()
return result
except:
timer.cancel()
self._failures += 1
if self._failures == self._failures_limit:
self._reset_session()
class MultiSessionRetriever:
def __init__(self, username, password, session_requests_limit, session_failures_limit):
self._returned = False
self._username = username
self._password = password
self._sessions_stack = []
self._session_requests_limit = session_requests_limit
self._session_failures_limit = session_failures_limit
def retrieve(self, urls, timeout, parallel_sessions_limit, callback):
pool = eventlet.GreenPool(parallel_sessions_limit)
try:
# for url in urls:
# print("spawn {}".format(url))
# pool.spawn_n(self._retrieve_single(url, timeout))
#pool.waitall()
for url, body in pool.imap(lambda url: self._retrieve_single(url, timeout), urls):
if body:
print("\n MultiSessionRetriever.retrieve: Body received")
print(datetime.datetime.now())
# eventlet.Event.send_exception
#return body
#eventlet.kill(self)
# pool.kill()
print("\n MultiSessionRetriever.retrieve: in for loop")
print(url)
print(body)
print(datetime.datetime.now())
callback(url, body)
except Exception as e:
# eventlet.kill(pool)
# eventlet.kill(self)
print(e)
print("\n MultiSessionRetriever.retrieve: after loop")
print(datetime.datetime.now())
# eventlet.kill(self)
def _retrieve_single(self, url, timeout):
print("\n MultiSessionRetriever._retrieve_single url:")
print(url)
print(datetime.datetime.now())
if self._sessions_stack:
session = self._sessions_stack.pop()
else:
session = SingleSessionRetriever(self._username, self._password,
self._session_requests_limit, self._session_failures_limit)
body = session.retrieve(url, timeout)
print("\n MultiSessionRetriever._retrieve_single body:")
print(body)
print(datetime.datetime.now())
self._sessions_stack.append(session)
return url, body
def output(url, body):
print("\n MultiSessionRetriever.output:")
print(url)
print(body)
print(datetime.datetime.now())
results2.append(body)
# n_total_req = 2
req_timeout = 10
n_parallel_exit_nodes = 2
switch_ip_every_n_req = 1
max_failures = 2
urls = ['http://httpstat.us/200?sleep=5000','http://httpstat.us/200']
print("start")
print(datetime.datetime.now())
x = MultiSessionRetriever('', '', switch_ip_every_n_req, max_failures).retrieve(
urls, req_timeout, n_parallel_exit_nodes, output)
print("result1:")
print(results1)
print("result2:")
print(results2)
return results2
控制台输出(我使用了另外两个 url 作为响应文本响应快速和慢速)。
web_1 | high_perf_parallel_requests: start
web_1 | start
web_1 | 2021-02-04 02:28:17.503574
web_1 |
web_1 | MultiSessionRetriever._retrieve_single url:
web_1 | http://httpstat.us/200?sleep=5000
web_1 | 2021-02-04 02:28:17.503903
web_1 |
web_1 | SingleSessionRetriever.retrieve init
web_1 | http://httpstat.us/200?sleep=5000
web_1 | 2021-02-04 02:28:17.503948
web_1 |
web_1 | MultiSessionRetriever._retrieve_single url:
web_1 | http://httpstat.us/200
web_1 | 2021-02-04 02:28:17.511720
web_1 |
web_1 | SingleSessionRetriever.retrieve init
web_1 | http://httpstat.us/200
web_1 | 2021-02-04 02:28:17.511783
web_1 |
web_1 | SingleSessionRetriever.retrieve result
web_1 | http://httpstat.us/200
web_1 | b'"fast response result"\n'
web_1 | 2021-02-04 02:28:18.269042
web_1 |
web_1 | MultiSessionRetriever._retrieve_single body:
web_1 | b'"fast response result"\n'
web_1 | 2021-02-04 02:28:18.269220
web_1 |
web_1 | SingleSessionRetriever.retrieve result
web_1 | http://httpstat.us/200?sleep=5000
web_1 | b'"slow response result"\n'
web_1 | 2021-02-04 02:28:24.458372
web_1 |
web_1 | MultiSessionRetriever._retrieve_single body:
web_1 | b'"slow response result"\n'
web_1 | 2021-02-04 02:28:24.458499
web_1 |
web_1 | MultiSessionRetriever.retrieve: Body received
web_1 | 2021-02-04 02:28:24.458814
web_1 |
web_1 | MultiSessionRetriever.retrieve: in for loop
web_1 | http://httpstat.us/200?sleep=5000
web_1 | b'"slow response result"\n'
web_1 | 2021-02-04 02:28:24.458857
web_1 |
web_1 | MultiSessionRetriever.output:
web_1 | http://httpstat.us/200?sleep=5000
web_1 | b'"slow response result"\n'
web_1 | 2021-02-04 02:28:24.458918
web_1 |
web_1 | MultiSessionRetriever.retrieve: Body received
web_1 | 2021-02-04 02:28:24.459057
web_1 |
web_1 | MultiSessionRetriever.retrieve: in for loop
web_1 | http://httpstat.us/200
web_1 | b'"fast response result"\n'
web_1 | 2021-02-04 02:28:24.459158
web_1 |
web_1 | MultiSessionRetriever.output:
web_1 | http://httpstat.us/200
web_1 | b'"fast response result"\n'
web_1 | 2021-02-04 02:28:24.459206
web_1 |
web_1 | MultiSessionRetriever.retrieve: after loop
web_1 | 2021-02-04 02:28:24.459482
web_1 | result1
web_1 | [b'"fast response result"\n', b'"slow response result"\n']
web_1 | result2
web_1 | [b'"slow response result"\n', b'"fast response result"\n']
web_1 | Parallel resp = [b'"slow response result"\n', b'"fast response result"\n']
Eventlet 和 Concurrent Futures 的其他尝试
def parallel_request(url):
fastest_result = None
try:
import datetime
import eventlet
from eventlet.green.urllib.request import urlopen
# urls = ["http://www.google.com/intl/en_ALL/images/logo.gif",
# "https://www.python.org/static/img/python-logo.png",
# "http://us.i1.yimg.com/us.yimg.com/i/ww/beta/y3.gif"]
urls = ['http://httpstat.us/200?sleep=5000','http://httpstat.us/200']
def fetch(url):
print("\n Fetch start")
print(url)
print(datetime.datetime.now())
result = urlopen(url).read()
print("\n Fetch result")
print(result)
print(datetime.datetime.now())
return result
pool = eventlet.GreenPool()
print("\n Parallel start")
print(datetime.datetime.now())
for body in pool.imap(fetch, urls):
print("\n Pool result")
print(body)
print(datetime.datetime.now())
print("\n Parallel end")
print(datetime.datetime.now())
except Exception as e:
print(e)
print("Fastest result= {}".format(fastest_result))
期货
def request_futures(url):
try:
import datetime
import concurrent.futures
import urllib.request
urls = ['http://httpstat.us/200?sleep=5000','http://httpstat.us/200']
print("\n Start Futures")
print(datetime.datetime.now())
# Retrieve a single page and report the URL and contents
def load_url(url, timeout):
with urllib.request.urlopen(url, timeout=timeout) as conn:
print("\n load url")
print(datetime.datetime.now())
result = conn.read()
print(result)
print(datetime.datetime.now())
return result
# We can use a with statement to ensure threads are cleaned up promptly
with concurrent.futures.ThreadPoolExecutor() as executor:
# Start the load operations and mark each future with its URL
future_to_url = {executor.submit(load_url, url, 60): url for url in urls}
for future in concurrent.futures.as_completed(future_to_url):
print("\n Iterate future")
print(datetime.datetime.now())
url = future_to_url[future]
try:
print("\n Try future")
print(url)
print(datetime.datetime.now())
data = future.result()
print("\n Data future")
print(data)
print(datetime.datetime.now())
except Exception as exc:
print('%r generated an exception: %s' % (url, exc))
else:
print('%r page is %d bytes' % (url, len(data)))
print("\n End Futures")
print(datetime.datetime.now())
except Exception as e:
print(e)
我把事情搞得太复杂了,发现最简单的方法是在 Celery 后台工作程序(我已经在使用)中通过多个任务发送并行 url 请求。 Celery background worker 使用 Eventlet 和多个 worker 来处理大量并发任务(尤其是有很多 I/O 等待时间)
使用下面的代码,我用相同的 URL 调用 Celery 任务两次。如果其中一个请求已准备就绪,则每 x 毫秒检查一次。如果是这样,接受第一个完成的请求并取消其他 Celery 任务。此设置使用 Eventlet 的唯一限制是 Celery 不支持在 运行 使用 Eventlet 时完全终止任务。将来,我可能想通过在 Redis 中使用一个键来改进这一点,让两个并行任务检查另一个任务是否完成。如果是,则可以取消剩余的任务。
from datetime import date time
from app.blueprints.api.v1.tasks import parallel_request
t_start =datetime.now()
# Request two requests in parallel using Celery background tasks
job1 = parallel_request.apply_async(args=[search_url])
job2 = parallel_request.apply_async(args=[search_url])
ready = False
while not ready:
if job1.ready():
ready = True
print("Parallel job 1 finished first")
job = job1
job_cancel= job2
proxy = proxy0
break
if job2.ready():
ready = True
print("Parallel job 2 finished first")
proxy = proxy4
job = job2
job_cancel = job1
break
# Check
sleep(0.1)
t_end = datetime.now()
proxy_time = int((t_end - t_start).total_seconds() * 1000)
print("Result in {} ms".format(proxy_time))
data = job.get()
# Remove other parallel request in celery. #Terminate/Sigkill does not work using Eventlet
revoke(job_cancel.id)
我正在尝试通过外部代理(旋转器)优化请求。有时反应很快,有时很慢。所以我们的想法是并行发送同一个 url 请求的多个请求,获取最快的响应,return 数据,关闭函数而不等待其他较慢的响应。
python网上有很多关于并行请求的教程和SO问题,但都是针对不同请求的并行请求,而不是重复请求。此外,代码会等待所有请求完成。一旦最快的响应得到响应,我想终止并行请求逻辑(最好以一种干净的方式)。
我的应用 运行 在 Python Flask 和 运行 中使用 Gunicorn + Eventlet。我尝试了 Eventlet green pools 和 Python Concurrent Futures,但是使用 Eventlet Greenpool 似乎更合适,因为代码将 运行 在 Gunicorn + Eventlet workers 和 Celery with Eventlet workers 中。
我目前正在使用 Luminati 代理管理器 (LPM) 重试失败的请求。一个老版本似乎支持盒子里的并行请求,但是现在的版本已经不支持这个功能了。所以我要么尝试用我的 Python 应用程序中的代码解决它,要么添加另一个服务/工具(如 LPM)来处理并行请求并选择最快的一个。
代理服务Luminati.io提供了一个'high performance parallel request'代码示例(基于Eventlet Greenpool)。参见 'original example'
我编辑了没有代理和登录的代码,以使其更具可重复性并避免不可预测的代理响应时间。我没有得到 Luminati 的任何支持,所以我正在尝试在 SO 上解决这个问题。 对于此测试,我使用模拟的 5 秒慢响应,以及来自 httpstat.us:
的快速响应['http://httpstat.us/200?sleep=5000','http://httpstat.us/200']
在编辑后的代码中,我添加了带计时的打印语句,以查看哪个响应先返回。 这段代码有两个问题。有时我可以看到快速响应首先返回并打印响应数据 ('OK'),然后 5 秒后显示慢速响应。但是,通常看起来代码会等到两个响应都返回(两个时间完全相同)。
另一个问题是,虽然我能够立即打印并查看 'fast' 响应的数据,但逻辑仍然会等待所有响应完成。我想 return 数据并在第一个响应返回后关闭该功能。在我编辑的代码中,你可以看到一些代码(注释掉的行)是我试图不成功地终止进程(然而这只是重新启动 eventlet 进程)。
原始示例
import eventlet
from eventlet.green.urllib import request
import random
import socket
super_proxy = socket.gethostbyname('zproxy.lum-superproxy.io')
class SingleSessionRetriever:
url = "http://%s-session-%s:%s@"+super_proxy+":%d"
port = 22225
def __init__(self, username, password, requests_limit, failures_limit):
self._username = username
self._password = password
self._requests_limit = requests_limit
self._failures_limit = failures_limit
self._reset_session()
def _reset_session(self):
session_id = random.random()
proxy = SingleSessionRetriever.url % (self._username, session_id, self._password,
SingleSessionRetriever.port)
proxy_handler = request.ProxyHandler({'http': proxy, 'https': proxy})
self._opener = request.build_opener(proxy_handler)
self._requests = 0
self._failures = 0
def retrieve(self, url, timeout):
while True:
if self._requests == self._requests_limit:
self._reset_session()
self._requests += 1
try:
timer = eventlet.Timeout(timeout)
result = self._opener.open(url).read()
timer.cancel()
return result
except:
timer.cancel()
self._failures += 1
if self._failures == self._failures_limit:
self._reset_session()
class MultiSessionRetriever:
def __init__(self, username, password, session_requests_limit, session_failures_limit):
self._username = username
self._password = password
self._sessions_stack = []
self._session_requests_limit = session_requests_limit
self._session_failures_limit = session_failures_limit
def retrieve(self, urls, timeout, parallel_sessions_limit, callback):
pool = eventlet.GreenPool(parallel_sessions_limit)
for url, body in pool.imap(lambda url: self._retrieve_single(url, timeout), urls):
callback(url, body)
def _retrieve_single(self, url, timeout):
if self._sessions_stack:
session = self._sessions_stack.pop()
else:
session = SingleSessionRetriever(self._username, self._password,
self._session_requests_limit, self._session_failures_limit)
body = session.retrieve(url, timeout)
self._sessions_stack.append(session)
return url, body
def output(url, body):
print(body)
n_total_req = 100
req_timeout = 10
n_parallel_exit_nodes = 10
switch_ip_every_n_req = 10
max_failures = 2
MultiSessionRetriever('lum-customer-c_ba028d72-zone-static', 'akssw3iy6h3y', switch_ip_every_n_req, max_failures).retrieve(
["http://lumtest.com/myip.json"] * n_total_req, req_timeout, n_parallel_exit_nodes, output)
编辑代码(没有登录和代理)
def high_perf_parallel_requests(search_url):
try:
import datetime
from eventlet.green.urllib import request
results2 = []
results1 = []
class SingleSessionRetriever:
def __init__(self, username, password, requests_limit, failures_limit):
self._username = username
self._password = password
self._requests_limit = requests_limit
self._failures_limit = failures_limit
self._reset_session()
def _reset_session(self):
self._requests = 0
self._failures = 0
def retrieve(self, url, timeout):
print("\n SingleSessionRetriever.retrieve init")
print(url)
print(datetime.datetime.now())
while True:
if self._requests == self._requests_limit:
self._reset_session()
self._requests += 1
try:
timer = eventlet.Timeout(timeout)
result = request.urlopen(url).read()
print("\n SingleSessionRetriever.retrieve result")
print(url)
print(result)
print(datetime.datetime.now())
results1.append(result)
timer.cancel()
# eventlet.kill(pool)
# raise Exception("Got fastest result. Kill eventlet")
#eventlet.kill(self)
#pool.kill()
return result
except:
timer.cancel()
self._failures += 1
if self._failures == self._failures_limit:
self._reset_session()
class MultiSessionRetriever:
def __init__(self, username, password, session_requests_limit, session_failures_limit):
self._returned = False
self._username = username
self._password = password
self._sessions_stack = []
self._session_requests_limit = session_requests_limit
self._session_failures_limit = session_failures_limit
def retrieve(self, urls, timeout, parallel_sessions_limit, callback):
pool = eventlet.GreenPool(parallel_sessions_limit)
try:
# for url in urls:
# print("spawn {}".format(url))
# pool.spawn_n(self._retrieve_single(url, timeout))
#pool.waitall()
for url, body in pool.imap(lambda url: self._retrieve_single(url, timeout), urls):
if body:
print("\n MultiSessionRetriever.retrieve: Body received")
print(datetime.datetime.now())
# eventlet.Event.send_exception
#return body
#eventlet.kill(self)
# pool.kill()
print("\n MultiSessionRetriever.retrieve: in for loop")
print(url)
print(body)
print(datetime.datetime.now())
callback(url, body)
except Exception as e:
# eventlet.kill(pool)
# eventlet.kill(self)
print(e)
print("\n MultiSessionRetriever.retrieve: after loop")
print(datetime.datetime.now())
# eventlet.kill(self)
def _retrieve_single(self, url, timeout):
print("\n MultiSessionRetriever._retrieve_single url:")
print(url)
print(datetime.datetime.now())
if self._sessions_stack:
session = self._sessions_stack.pop()
else:
session = SingleSessionRetriever(self._username, self._password,
self._session_requests_limit, self._session_failures_limit)
body = session.retrieve(url, timeout)
print("\n MultiSessionRetriever._retrieve_single body:")
print(body)
print(datetime.datetime.now())
self._sessions_stack.append(session)
return url, body
def output(url, body):
print("\n MultiSessionRetriever.output:")
print(url)
print(body)
print(datetime.datetime.now())
results2.append(body)
# n_total_req = 2
req_timeout = 10
n_parallel_exit_nodes = 2
switch_ip_every_n_req = 1
max_failures = 2
urls = ['http://httpstat.us/200?sleep=5000','http://httpstat.us/200']
print("start")
print(datetime.datetime.now())
x = MultiSessionRetriever('', '', switch_ip_every_n_req, max_failures).retrieve(
urls, req_timeout, n_parallel_exit_nodes, output)
print("result1:")
print(results1)
print("result2:")
print(results2)
return results2
控制台输出(我使用了另外两个 url 作为响应文本响应快速和慢速)。
web_1 | high_perf_parallel_requests: start
web_1 | start
web_1 | 2021-02-04 02:28:17.503574
web_1 |
web_1 | MultiSessionRetriever._retrieve_single url:
web_1 | http://httpstat.us/200?sleep=5000
web_1 | 2021-02-04 02:28:17.503903
web_1 |
web_1 | SingleSessionRetriever.retrieve init
web_1 | http://httpstat.us/200?sleep=5000
web_1 | 2021-02-04 02:28:17.503948
web_1 |
web_1 | MultiSessionRetriever._retrieve_single url:
web_1 | http://httpstat.us/200
web_1 | 2021-02-04 02:28:17.511720
web_1 |
web_1 | SingleSessionRetriever.retrieve init
web_1 | http://httpstat.us/200
web_1 | 2021-02-04 02:28:17.511783
web_1 |
web_1 | SingleSessionRetriever.retrieve result
web_1 | http://httpstat.us/200
web_1 | b'"fast response result"\n'
web_1 | 2021-02-04 02:28:18.269042
web_1 |
web_1 | MultiSessionRetriever._retrieve_single body:
web_1 | b'"fast response result"\n'
web_1 | 2021-02-04 02:28:18.269220
web_1 |
web_1 | SingleSessionRetriever.retrieve result
web_1 | http://httpstat.us/200?sleep=5000
web_1 | b'"slow response result"\n'
web_1 | 2021-02-04 02:28:24.458372
web_1 |
web_1 | MultiSessionRetriever._retrieve_single body:
web_1 | b'"slow response result"\n'
web_1 | 2021-02-04 02:28:24.458499
web_1 |
web_1 | MultiSessionRetriever.retrieve: Body received
web_1 | 2021-02-04 02:28:24.458814
web_1 |
web_1 | MultiSessionRetriever.retrieve: in for loop
web_1 | http://httpstat.us/200?sleep=5000
web_1 | b'"slow response result"\n'
web_1 | 2021-02-04 02:28:24.458857
web_1 |
web_1 | MultiSessionRetriever.output:
web_1 | http://httpstat.us/200?sleep=5000
web_1 | b'"slow response result"\n'
web_1 | 2021-02-04 02:28:24.458918
web_1 |
web_1 | MultiSessionRetriever.retrieve: Body received
web_1 | 2021-02-04 02:28:24.459057
web_1 |
web_1 | MultiSessionRetriever.retrieve: in for loop
web_1 | http://httpstat.us/200
web_1 | b'"fast response result"\n'
web_1 | 2021-02-04 02:28:24.459158
web_1 |
web_1 | MultiSessionRetriever.output:
web_1 | http://httpstat.us/200
web_1 | b'"fast response result"\n'
web_1 | 2021-02-04 02:28:24.459206
web_1 |
web_1 | MultiSessionRetriever.retrieve: after loop
web_1 | 2021-02-04 02:28:24.459482
web_1 | result1
web_1 | [b'"fast response result"\n', b'"slow response result"\n']
web_1 | result2
web_1 | [b'"slow response result"\n', b'"fast response result"\n']
web_1 | Parallel resp = [b'"slow response result"\n', b'"fast response result"\n']
Eventlet 和 Concurrent Futures 的其他尝试
def parallel_request(url):
fastest_result = None
try:
import datetime
import eventlet
from eventlet.green.urllib.request import urlopen
# urls = ["http://www.google.com/intl/en_ALL/images/logo.gif",
# "https://www.python.org/static/img/python-logo.png",
# "http://us.i1.yimg.com/us.yimg.com/i/ww/beta/y3.gif"]
urls = ['http://httpstat.us/200?sleep=5000','http://httpstat.us/200']
def fetch(url):
print("\n Fetch start")
print(url)
print(datetime.datetime.now())
result = urlopen(url).read()
print("\n Fetch result")
print(result)
print(datetime.datetime.now())
return result
pool = eventlet.GreenPool()
print("\n Parallel start")
print(datetime.datetime.now())
for body in pool.imap(fetch, urls):
print("\n Pool result")
print(body)
print(datetime.datetime.now())
print("\n Parallel end")
print(datetime.datetime.now())
except Exception as e:
print(e)
print("Fastest result= {}".format(fastest_result))
期货
def request_futures(url):
try:
import datetime
import concurrent.futures
import urllib.request
urls = ['http://httpstat.us/200?sleep=5000','http://httpstat.us/200']
print("\n Start Futures")
print(datetime.datetime.now())
# Retrieve a single page and report the URL and contents
def load_url(url, timeout):
with urllib.request.urlopen(url, timeout=timeout) as conn:
print("\n load url")
print(datetime.datetime.now())
result = conn.read()
print(result)
print(datetime.datetime.now())
return result
# We can use a with statement to ensure threads are cleaned up promptly
with concurrent.futures.ThreadPoolExecutor() as executor:
# Start the load operations and mark each future with its URL
future_to_url = {executor.submit(load_url, url, 60): url for url in urls}
for future in concurrent.futures.as_completed(future_to_url):
print("\n Iterate future")
print(datetime.datetime.now())
url = future_to_url[future]
try:
print("\n Try future")
print(url)
print(datetime.datetime.now())
data = future.result()
print("\n Data future")
print(data)
print(datetime.datetime.now())
except Exception as exc:
print('%r generated an exception: %s' % (url, exc))
else:
print('%r page is %d bytes' % (url, len(data)))
print("\n End Futures")
print(datetime.datetime.now())
except Exception as e:
print(e)
我把事情搞得太复杂了,发现最简单的方法是在 Celery 后台工作程序(我已经在使用)中通过多个任务发送并行 url 请求。 Celery background worker 使用 Eventlet 和多个 worker 来处理大量并发任务(尤其是有很多 I/O 等待时间)
使用下面的代码,我用相同的 URL 调用 Celery 任务两次。如果其中一个请求已准备就绪,则每 x 毫秒检查一次。如果是这样,接受第一个完成的请求并取消其他 Celery 任务。此设置使用 Eventlet 的唯一限制是 Celery 不支持在 运行 使用 Eventlet 时完全终止任务。将来,我可能想通过在 Redis 中使用一个键来改进这一点,让两个并行任务检查另一个任务是否完成。如果是,则可以取消剩余的任务。
from datetime import date time
from app.blueprints.api.v1.tasks import parallel_request
t_start =datetime.now()
# Request two requests in parallel using Celery background tasks
job1 = parallel_request.apply_async(args=[search_url])
job2 = parallel_request.apply_async(args=[search_url])
ready = False
while not ready:
if job1.ready():
ready = True
print("Parallel job 1 finished first")
job = job1
job_cancel= job2
proxy = proxy0
break
if job2.ready():
ready = True
print("Parallel job 2 finished first")
proxy = proxy4
job = job2
job_cancel = job1
break
# Check
sleep(0.1)
t_end = datetime.now()
proxy_time = int((t_end - t_start).total_seconds() * 1000)
print("Result in {} ms".format(proxy_time))
data = job.get()
# Remove other parallel request in celery. #Terminate/Sigkill does not work using Eventlet
revoke(job_cancel.id)