Twisted/Python - 逐行处理一个大文件
Twisted/Python - processing a large file line by line
拥有读取文件并处理文件的代码。该文件很大,有 1200 万行,所以目前我将其手动拆分为 1000 行文件,并按顺序为每 1000 行启动每个进程(bash 脚本)。
有没有一种方法可以使用 Twisted 加载一个文件并处理一个文件中的 1000 个项目(进度条会很好)而不需要我手动拆分它?
scanner.py
import argparse
from tqdm import tqdm
from sys import argv
from pprint import pformat
from twisted.internet.task import react
from twisted.web.client import Agent, readBody
from twisted.web.http_headers import Headers
import lxml.html
from geoip import geolite2
import pycountry
from tld import get_tld
import json
import socket
poweredby = ""
server = ""
ip = ""
def cbRequest(response, url):
global poweredby, server, ip
# print 'Response version:', response.version
# print 'Response code:', response.code
# print 'Response phrase:', response.phrase
# print 'Response headers:'
# print pformat(list(response.headers.getAllRawHeaders()))
poweredby = response.headers.getRawHeaders("X-Powered-By")[0]
server = response.headers.getRawHeaders("Server")[0]
#print poweredby
#print server
d = readBody(response)
d.addCallback(cbBody, url)
return d
def cbBody(body, ourl):
global poweredby, server,ip
#print body
html_element = lxml.html.fromstring(body)
generator = html_element.xpath("//meta[@name='generator']/@content")
ip = socket.gethostbyname(ourl)
try:
match = geolite2.lookup(ip)
if match is not None:
country = match.country
try:
c = pycountry.countries.lookup(country)
country = c.name
except:
country = ""
except:
country = ""
try:
res = get_tld("http://www" + ourl, as_object=True)
tld = res.suffix
except:
tld = ""
try:
match = re.search(r'[\w\.-]+@[\w\.-]+', body)
email = match.group(0)
except:
email = ""
permalink=ourl.rstrip().replace(".","-")
try:
item = generator[0]
val = "{ \"Domain\":" + json.dumps(
"http://" + ourl.rstrip()) + ",\"IP\":\"" + ip + "\",\"Server\":" + json.dumps(
str(server)) + ",\"PoweredBy\":" + json.dumps(
str(poweredby)) + ",\"MetaGenerator\":" + json.dumps(item) + ",\"Email\":" + json.dumps(
email) + ",\"Suffix\":\"" + tld + "\",\"CountryHosted\":\"" + country+"\",\"permalink\":\""+permalink+"\" }"
except:
val = "{ \"Domain\":" + json.dumps(
"http://" + ourl.rstrip()) + ",\"IP\":\"" + ip + "\"," + "\"Server\":" + json.dumps(
str(server)) + ",\"PoweredBy\":" + json.dumps(
str(poweredby)) + ",\"MetaGenerator\":\"\",\"Email\":" + json.dumps(
email) + ",\"Suffix\":\"" + tld + "\",\"CountryHosted\":\"" + country+"\",\"permalink\":\""+permalink+"\" }"
print val
if __name__ == '__main__':
parser = argparse.ArgumentParser(description='Scanner v0.99')
parser.add_argument(
'-i', '--input', help='Input list of domains', required=True)
args = parser.parse_args()
input = args.input
with open(input) as f:
urls = f.read().splitlines()
def mainjob(reactor, urls=urls):
for url in tqdm(urls):
agent = Agent(reactor)
d = agent.request(
'GET', "http://" + url,
Headers({'User-Agent': ['bot']}),
None)
d.addCallback(cbRequest, url)
d.addErrback(lambda x: None) # ignore errors
return d
react(mainjob, argv[3:])
更新 1:
现在我这样执行:
file.txt - 12,000,000 行
chunk01.txt - 1000 行的文件
.
.
.
我为每个块文件执行一个脚本。
python scanner.py chunk01.txt
python scanner.py chunk02.txt
.
.
.
要执行脚本一次:
python scanner.py file.txt
问题在于,我需要将 url 作为参数传递给 react()。如果我将它读入内存(通过 f.read())作为 12,000,000 个文件,它太大了。因此,我拆分了文件并在每个小文件上执行脚本。
希望现在更清楚了...
更新二:
根据@Jean-Paul Calderone 的回答,我编写了这段代码。
它似乎有效,但是从那以后我就被撞到了:
180,000 次迭代....我假设有 180,000 个域(输入文件中的每一行),脚本只有 printed/outputted ca。 35707 行(条目)。我预计它会接近 180,000……我知道有些域会超时。当我 运行 它采用 "old" 方式时,它更一致,数字更接近,即输入域的数量接近输出文件中的输出行数。
代码可以 "bad" 吗?有什么想法吗?
python scanner.py > out.txt
181668it [1:47:36, 4.82it/s]
并计算行数:
wc -l out.txt
36840 out.txt
scanner.py
import argparse
from tqdm import tqdm
from sys import argv
from pprint import pformat
from twisted.internet.task import react
from twisted.web.client import Agent, readBody
from twisted.web.http_headers import Headers
from twisted.internet.task import cooperate
from twisted.internet.defer import gatherResults
import lxml.html
from geoip import geolite2
import pycountry
from tld import get_tld
import json
import socket
poweredby = ""
server = ""
ip = ""
def cbRequest(response, url):
global poweredby, server, ip
# print 'Response version:', response.version
# print 'Response code:', response.code
# print 'Response phrase:', response.phrase
# print 'Response headers:'
# print pformat(list(response.headers.getAllRawHeaders()))
poweredby = response.headers.getRawHeaders("X-Powered-By")[0]
server = response.headers.getRawHeaders("Server")[0]
#print poweredby
#print server
d = readBody(response)
d.addCallback(cbBody, url)
return d
def cbBody(body, ourl):
global poweredby, server,ip
#print body
html_element = lxml.html.fromstring(body)
generator = html_element.xpath("//meta[@name='generator']/@content")
ip = socket.gethostbyname(ourl)
try:
match = geolite2.lookup(ip)
if match is not None:
country = match.country
try:
c = pycountry.countries.lookup(country)
country = c.name
except:
country = ""
except:
country = ""
try:
res = get_tld("http://www" + ourl, as_object=True)
tld = res.suffix
except:
tld = ""
try:
match = re.search(r'[\w\.-]+@[\w\.-]+', body)
email = match.group(0)
except:
email = ""
permalink=ourl.rstrip().replace(".","-")
try:
item = generator[0]
val = "{ \"Domain\":" + json.dumps(
"http://" + ourl.rstrip()) + ",\"IP\":\"" + ip + "\",\"Server\":" + json.dumps(
str(server)) + ",\"PoweredBy\":" + json.dumps(
str(poweredby)) + ",\"MetaGenerator\":" + json.dumps(item) + ",\"Email\":" + json.dumps(
email) + ",\"Suffix\":\"" + tld + "\",\"CountryHosted\":\"" + country+"\",\"permalink\":\""+permalink+"\" }"
except:
val = "{ \"Domain\":" + json.dumps(
"http://" + ourl.rstrip()) + ",\"IP\":\"" + ip + "\"," + "\"Server\":" + json.dumps(
str(server)) + ",\"PoweredBy\":" + json.dumps(
str(poweredby)) + ",\"MetaGenerator\":\"\",\"Email\":" + json.dumps(
email) + ",\"Suffix\":\"" + tld + "\",\"CountryHosted\":\"" + country+"\",\"permalink\":\""+permalink+"\" }"
print val
def main(reactor, url_path):
urls = open(url_path)
return mainjob(reactor, (url.strip() for url in urls))
def mainjob(reactor, urls=argv[2:]):
#for url in urls:
# print url
agent = Agent(reactor)
work = (process(agent, url) for url in tqdm(urls))
tasks = list(cooperate(work) for i in range(100))
return gatherResults(list(task.whenDone() for task in tasks))
def process(agent, url):
d = agent.request(
'GET', "http://" + url,
Headers({'User-Agent': ['bot']}),
None)
d.addCallback(cbRequest, url)
d.addErrback(lambda x: None) # ignore errors
return d
react(main, ["./domains.txt"])
更新 3:
更新了代码以将错误打印到 errors.txt
import argparse
from tqdm import tqdm
from sys import argv
from pprint import pformat
from twisted.internet.task import react
from twisted.web.client import Agent, readBody
from twisted.web.http_headers import Headers
from twisted.internet.task import cooperate
from twisted.internet.defer import gatherResults
import lxml.html
from geoip import geolite2
import pycountry
from tld import get_tld
import json
import socket
poweredby = ""
server = ""
ip = ""
f = open("errors.txt", "w")
def error(response, url):
f.write("Error: "+url+"\n")
def cbRequest(response, url):
global poweredby, server, ip
# print 'Response version:', response.version
# print 'Response code:', response.code
# print 'Response phrase:', response.phrase
# print 'Response headers:'
# print pformat(list(response.headers.getAllRawHeaders()))
poweredby = response.headers.getRawHeaders("X-Powered-By")[0]
server = response.headers.getRawHeaders("Server")[0]
#print poweredby
#print server
d = readBody(response)
d.addCallback(cbBody, url)
return d
def cbBody(body, ourl):
global poweredby, server,ip
#print body
html_element = lxml.html.fromstring(body)
generator = html_element.xpath("//meta[@name='generator']/@content")
ip = socket.gethostbyname(ourl)
try:
match = geolite2.lookup(ip)
if match is not None:
country = match.country
try:
c = pycountry.countries.lookup(country)
country = c.name
except:
country = ""
except:
country = ""
try:
res = get_tld("http://www" + ourl, as_object=True)
tld = res.suffix
except:
tld = ""
try:
match = re.search(r'[\w\.-]+@[\w\.-]+', body)
email = match.group(0)
except:
email = ""
permalink=ourl.rstrip().replace(".","-")
try:
item = generator[0]
val = "{ \"Domain\":" + json.dumps(
"http://" + ourl.rstrip()) + ",\"IP\":\"" + ip + "\",\"Server\":" + json.dumps(
str(server)) + ",\"PoweredBy\":" + json.dumps(
str(poweredby)) + ",\"MetaGenerator\":" + json.dumps(item) + ",\"Email\":" + json.dumps(
email) + ",\"Suffix\":\"" + tld + "\",\"CountryHosted\":\"" + country+"\",\"permalink\":\""+permalink+"\" }"
except:
val = "{ \"Domain\":" + json.dumps(
"http://" + ourl.rstrip()) + ",\"IP\":\"" + ip + "\"," + "\"Server\":" + json.dumps(
str(server)) + ",\"PoweredBy\":" + json.dumps(
str(poweredby)) + ",\"MetaGenerator\":\"\",\"Email\":" + json.dumps(
email) + ",\"Suffix\":\"" + tld + "\",\"CountryHosted\":\"" + country+"\",\"permalink\":\""+permalink+"\" }"
print val
def main(reactor, url_path):
urls = open(url_path)
return mainjob(reactor, (url.strip() for url in urls))
def mainjob(reactor, urls=argv[2:]):
#for url in urls:
# print url
agent = Agent(reactor)
work = (process(agent, url) for url in tqdm(urls))
tasks = list(cooperate(work) for i in range(100))
return gatherResults(list(task.whenDone() for task in tasks))
def process(agent, url):
d = agent.request(
'GET', "http://" + url,
Headers({'User-Agent': ['crawler']}),
None)
d.addCallback(cbRequest, url)
d.addErrback(error, url)
return d
react(main, ["./domains.txt"])
f.close()
更新四:
我用 Wireshark 捕获了流量,只有 2 个域,这些域之前出错了:
user@laptop:~/crawler$ python scanner.py
2it [00:00, 840.71it/s]
user@laptop:~/crawler$ cat errors.txt
Error: google.al
Error: fau.edu.al
如您所见,它们有错误,但使用 Wireshark 我看到了响应:
您需要对程序创建的并发量添加限制。目前,您处理同时给出的所有 URLs - 或者至少尝试:
def mainjob(reactor, urls=urls):
for url in tqdm(urls):
agent = Agent(reactor)
d = agent.request(
'GET', "http://" + url,
Headers({'User-Agent': ['bot']}),
None)
d.addCallback(cbRequest, url)
d.addErrback(lambda x: None) # ignore errors
return d
这会为每个 URL 发出一个请求,而无需等待它们中的任何一个完成。相反,一次使用 twisted.internet.task.cooperate
到 运行 有限的数量。 运行一次一个请求:
def mainjob(reactor, urls):
agent = Agent(reactor)
work = (process(agent, url) for url in tqdm(urls))
task = cooperate(work)
return task.whenDone()
def process(agent, url):
d = agent.request(
'GET', "http://" + url,
Headers({'User-Agent': ['bot']}),
None)
d.addCallback(cbRequest, url)
d.addErrback(lambda x: None) # ignore errors
return d
您可能想要更多。所以,多调用cooperate()几次:
def mainjob(reactor, urls=urls):
agent = Agent(reactor)
work = (process(agent, url) for url in tqdm(urls))
tasks = list(cooperate(work) for i in range(100))
return gatherResults(list(task.whenDone() for task in tasks))
这 运行 一次最多 100 个请求。每个任务从 work
中提取下一个元素并等待它。 gatherResults
等待所有 100 个任务完成。
现在只需避免一次将完整的输入加载到内存中即可:
def main(reactor, url_path):
urls = open(url_path)
return mainjob(reactor, (url.strip() for url in urls))
react(main, ["path-to-urls.txt"])
这将打开 url 文件,但仅在需要时从中读取行。
拥有读取文件并处理文件的代码。该文件很大,有 1200 万行,所以目前我将其手动拆分为 1000 行文件,并按顺序为每 1000 行启动每个进程(bash 脚本)。
有没有一种方法可以使用 Twisted 加载一个文件并处理一个文件中的 1000 个项目(进度条会很好)而不需要我手动拆分它?
scanner.py
import argparse
from tqdm import tqdm
from sys import argv
from pprint import pformat
from twisted.internet.task import react
from twisted.web.client import Agent, readBody
from twisted.web.http_headers import Headers
import lxml.html
from geoip import geolite2
import pycountry
from tld import get_tld
import json
import socket
poweredby = ""
server = ""
ip = ""
def cbRequest(response, url):
global poweredby, server, ip
# print 'Response version:', response.version
# print 'Response code:', response.code
# print 'Response phrase:', response.phrase
# print 'Response headers:'
# print pformat(list(response.headers.getAllRawHeaders()))
poweredby = response.headers.getRawHeaders("X-Powered-By")[0]
server = response.headers.getRawHeaders("Server")[0]
#print poweredby
#print server
d = readBody(response)
d.addCallback(cbBody, url)
return d
def cbBody(body, ourl):
global poweredby, server,ip
#print body
html_element = lxml.html.fromstring(body)
generator = html_element.xpath("//meta[@name='generator']/@content")
ip = socket.gethostbyname(ourl)
try:
match = geolite2.lookup(ip)
if match is not None:
country = match.country
try:
c = pycountry.countries.lookup(country)
country = c.name
except:
country = ""
except:
country = ""
try:
res = get_tld("http://www" + ourl, as_object=True)
tld = res.suffix
except:
tld = ""
try:
match = re.search(r'[\w\.-]+@[\w\.-]+', body)
email = match.group(0)
except:
email = ""
permalink=ourl.rstrip().replace(".","-")
try:
item = generator[0]
val = "{ \"Domain\":" + json.dumps(
"http://" + ourl.rstrip()) + ",\"IP\":\"" + ip + "\",\"Server\":" + json.dumps(
str(server)) + ",\"PoweredBy\":" + json.dumps(
str(poweredby)) + ",\"MetaGenerator\":" + json.dumps(item) + ",\"Email\":" + json.dumps(
email) + ",\"Suffix\":\"" + tld + "\",\"CountryHosted\":\"" + country+"\",\"permalink\":\""+permalink+"\" }"
except:
val = "{ \"Domain\":" + json.dumps(
"http://" + ourl.rstrip()) + ",\"IP\":\"" + ip + "\"," + "\"Server\":" + json.dumps(
str(server)) + ",\"PoweredBy\":" + json.dumps(
str(poweredby)) + ",\"MetaGenerator\":\"\",\"Email\":" + json.dumps(
email) + ",\"Suffix\":\"" + tld + "\",\"CountryHosted\":\"" + country+"\",\"permalink\":\""+permalink+"\" }"
print val
if __name__ == '__main__':
parser = argparse.ArgumentParser(description='Scanner v0.99')
parser.add_argument(
'-i', '--input', help='Input list of domains', required=True)
args = parser.parse_args()
input = args.input
with open(input) as f:
urls = f.read().splitlines()
def mainjob(reactor, urls=urls):
for url in tqdm(urls):
agent = Agent(reactor)
d = agent.request(
'GET', "http://" + url,
Headers({'User-Agent': ['bot']}),
None)
d.addCallback(cbRequest, url)
d.addErrback(lambda x: None) # ignore errors
return d
react(mainjob, argv[3:])
更新 1:
现在我这样执行:
file.txt - 12,000,000 行
chunk01.txt - 1000 行的文件 . . .
我为每个块文件执行一个脚本。
python scanner.py chunk01.txt
python scanner.py chunk02.txt
.
.
.
要执行脚本一次:
python scanner.py file.txt
问题在于,我需要将 url 作为参数传递给 react()。如果我将它读入内存(通过 f.read())作为 12,000,000 个文件,它太大了。因此,我拆分了文件并在每个小文件上执行脚本。
希望现在更清楚了...
更新二:
根据@Jean-Paul Calderone 的回答,我编写了这段代码。
它似乎有效,但是从那以后我就被撞到了:
180,000 次迭代....我假设有 180,000 个域(输入文件中的每一行),脚本只有 printed/outputted ca。 35707 行(条目)。我预计它会接近 180,000……我知道有些域会超时。当我 运行 它采用 "old" 方式时,它更一致,数字更接近,即输入域的数量接近输出文件中的输出行数。
代码可以 "bad" 吗?有什么想法吗?
python scanner.py > out.txt
181668it [1:47:36, 4.82it/s]
并计算行数:
wc -l out.txt
36840 out.txt
scanner.py
import argparse
from tqdm import tqdm
from sys import argv
from pprint import pformat
from twisted.internet.task import react
from twisted.web.client import Agent, readBody
from twisted.web.http_headers import Headers
from twisted.internet.task import cooperate
from twisted.internet.defer import gatherResults
import lxml.html
from geoip import geolite2
import pycountry
from tld import get_tld
import json
import socket
poweredby = ""
server = ""
ip = ""
def cbRequest(response, url):
global poweredby, server, ip
# print 'Response version:', response.version
# print 'Response code:', response.code
# print 'Response phrase:', response.phrase
# print 'Response headers:'
# print pformat(list(response.headers.getAllRawHeaders()))
poweredby = response.headers.getRawHeaders("X-Powered-By")[0]
server = response.headers.getRawHeaders("Server")[0]
#print poweredby
#print server
d = readBody(response)
d.addCallback(cbBody, url)
return d
def cbBody(body, ourl):
global poweredby, server,ip
#print body
html_element = lxml.html.fromstring(body)
generator = html_element.xpath("//meta[@name='generator']/@content")
ip = socket.gethostbyname(ourl)
try:
match = geolite2.lookup(ip)
if match is not None:
country = match.country
try:
c = pycountry.countries.lookup(country)
country = c.name
except:
country = ""
except:
country = ""
try:
res = get_tld("http://www" + ourl, as_object=True)
tld = res.suffix
except:
tld = ""
try:
match = re.search(r'[\w\.-]+@[\w\.-]+', body)
email = match.group(0)
except:
email = ""
permalink=ourl.rstrip().replace(".","-")
try:
item = generator[0]
val = "{ \"Domain\":" + json.dumps(
"http://" + ourl.rstrip()) + ",\"IP\":\"" + ip + "\",\"Server\":" + json.dumps(
str(server)) + ",\"PoweredBy\":" + json.dumps(
str(poweredby)) + ",\"MetaGenerator\":" + json.dumps(item) + ",\"Email\":" + json.dumps(
email) + ",\"Suffix\":\"" + tld + "\",\"CountryHosted\":\"" + country+"\",\"permalink\":\""+permalink+"\" }"
except:
val = "{ \"Domain\":" + json.dumps(
"http://" + ourl.rstrip()) + ",\"IP\":\"" + ip + "\"," + "\"Server\":" + json.dumps(
str(server)) + ",\"PoweredBy\":" + json.dumps(
str(poweredby)) + ",\"MetaGenerator\":\"\",\"Email\":" + json.dumps(
email) + ",\"Suffix\":\"" + tld + "\",\"CountryHosted\":\"" + country+"\",\"permalink\":\""+permalink+"\" }"
print val
def main(reactor, url_path):
urls = open(url_path)
return mainjob(reactor, (url.strip() for url in urls))
def mainjob(reactor, urls=argv[2:]):
#for url in urls:
# print url
agent = Agent(reactor)
work = (process(agent, url) for url in tqdm(urls))
tasks = list(cooperate(work) for i in range(100))
return gatherResults(list(task.whenDone() for task in tasks))
def process(agent, url):
d = agent.request(
'GET', "http://" + url,
Headers({'User-Agent': ['bot']}),
None)
d.addCallback(cbRequest, url)
d.addErrback(lambda x: None) # ignore errors
return d
react(main, ["./domains.txt"])
更新 3:
更新了代码以将错误打印到 errors.txt
import argparse
from tqdm import tqdm
from sys import argv
from pprint import pformat
from twisted.internet.task import react
from twisted.web.client import Agent, readBody
from twisted.web.http_headers import Headers
from twisted.internet.task import cooperate
from twisted.internet.defer import gatherResults
import lxml.html
from geoip import geolite2
import pycountry
from tld import get_tld
import json
import socket
poweredby = ""
server = ""
ip = ""
f = open("errors.txt", "w")
def error(response, url):
f.write("Error: "+url+"\n")
def cbRequest(response, url):
global poweredby, server, ip
# print 'Response version:', response.version
# print 'Response code:', response.code
# print 'Response phrase:', response.phrase
# print 'Response headers:'
# print pformat(list(response.headers.getAllRawHeaders()))
poweredby = response.headers.getRawHeaders("X-Powered-By")[0]
server = response.headers.getRawHeaders("Server")[0]
#print poweredby
#print server
d = readBody(response)
d.addCallback(cbBody, url)
return d
def cbBody(body, ourl):
global poweredby, server,ip
#print body
html_element = lxml.html.fromstring(body)
generator = html_element.xpath("//meta[@name='generator']/@content")
ip = socket.gethostbyname(ourl)
try:
match = geolite2.lookup(ip)
if match is not None:
country = match.country
try:
c = pycountry.countries.lookup(country)
country = c.name
except:
country = ""
except:
country = ""
try:
res = get_tld("http://www" + ourl, as_object=True)
tld = res.suffix
except:
tld = ""
try:
match = re.search(r'[\w\.-]+@[\w\.-]+', body)
email = match.group(0)
except:
email = ""
permalink=ourl.rstrip().replace(".","-")
try:
item = generator[0]
val = "{ \"Domain\":" + json.dumps(
"http://" + ourl.rstrip()) + ",\"IP\":\"" + ip + "\",\"Server\":" + json.dumps(
str(server)) + ",\"PoweredBy\":" + json.dumps(
str(poweredby)) + ",\"MetaGenerator\":" + json.dumps(item) + ",\"Email\":" + json.dumps(
email) + ",\"Suffix\":\"" + tld + "\",\"CountryHosted\":\"" + country+"\",\"permalink\":\""+permalink+"\" }"
except:
val = "{ \"Domain\":" + json.dumps(
"http://" + ourl.rstrip()) + ",\"IP\":\"" + ip + "\"," + "\"Server\":" + json.dumps(
str(server)) + ",\"PoweredBy\":" + json.dumps(
str(poweredby)) + ",\"MetaGenerator\":\"\",\"Email\":" + json.dumps(
email) + ",\"Suffix\":\"" + tld + "\",\"CountryHosted\":\"" + country+"\",\"permalink\":\""+permalink+"\" }"
print val
def main(reactor, url_path):
urls = open(url_path)
return mainjob(reactor, (url.strip() for url in urls))
def mainjob(reactor, urls=argv[2:]):
#for url in urls:
# print url
agent = Agent(reactor)
work = (process(agent, url) for url in tqdm(urls))
tasks = list(cooperate(work) for i in range(100))
return gatherResults(list(task.whenDone() for task in tasks))
def process(agent, url):
d = agent.request(
'GET', "http://" + url,
Headers({'User-Agent': ['crawler']}),
None)
d.addCallback(cbRequest, url)
d.addErrback(error, url)
return d
react(main, ["./domains.txt"])
f.close()
更新四:
我用 Wireshark 捕获了流量,只有 2 个域,这些域之前出错了:
user@laptop:~/crawler$ python scanner.py
2it [00:00, 840.71it/s]
user@laptop:~/crawler$ cat errors.txt
Error: google.al
Error: fau.edu.al
如您所见,它们有错误,但使用 Wireshark 我看到了响应:
您需要对程序创建的并发量添加限制。目前,您处理同时给出的所有 URLs - 或者至少尝试:
def mainjob(reactor, urls=urls):
for url in tqdm(urls):
agent = Agent(reactor)
d = agent.request(
'GET', "http://" + url,
Headers({'User-Agent': ['bot']}),
None)
d.addCallback(cbRequest, url)
d.addErrback(lambda x: None) # ignore errors
return d
这会为每个 URL 发出一个请求,而无需等待它们中的任何一个完成。相反,一次使用 twisted.internet.task.cooperate
到 运行 有限的数量。 运行一次一个请求:
def mainjob(reactor, urls):
agent = Agent(reactor)
work = (process(agent, url) for url in tqdm(urls))
task = cooperate(work)
return task.whenDone()
def process(agent, url):
d = agent.request(
'GET', "http://" + url,
Headers({'User-Agent': ['bot']}),
None)
d.addCallback(cbRequest, url)
d.addErrback(lambda x: None) # ignore errors
return d
您可能想要更多。所以,多调用cooperate()几次:
def mainjob(reactor, urls=urls):
agent = Agent(reactor)
work = (process(agent, url) for url in tqdm(urls))
tasks = list(cooperate(work) for i in range(100))
return gatherResults(list(task.whenDone() for task in tasks))
这 运行 一次最多 100 个请求。每个任务从 work
中提取下一个元素并等待它。 gatherResults
等待所有 100 个任务完成。
现在只需避免一次将完整的输入加载到内存中即可:
def main(reactor, url_path):
urls = open(url_path)
return mainjob(reactor, (url.strip() for url in urls))
react(main, ["path-to-urls.txt"])
这将打开 url 文件,但仅在需要时从中读取行。