在保留链接的同时使用分页抓取动态 table
Scrape dynamic table with pagination while preserving links
我是初学者 Python 程序员,试图抓取具有分页的动态 table(数据table)。有分别索引为“0”和“1”的“第一个”和“上一个”分页按钮,然后是编号按钮(见附图),所以我想从索引为“2”的按钮 1 开始,然后遍历页面直到我捕获完整的 table 并且所有链接都完好无损。
<a href="#" aria-controls="datatable" data-dt-idx="2" tabindex="0">1</a>
我设法抓取了前十 table 行的信息,但不知道如何继续抓取其余页面。我想我需要以某种方式遍历那些分页按钮(?)在阅读了无数教程和 Whosebug 问题并观看了几个 Youtube 视频之后,我设法拼凑了以下代码。但是,我最终得到了整个站点的 html,而不仅仅是我的 table,并且只检索了第一页上 table 的前 10 行。
from bs4 import BeautifulSoup
import pandas as pd
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as ec
from selenium.webdriver.chrome.options import Options
chrome_options = Options()
chrome_options.add_argument('--headless')
driver = webdriver.Chrome(r"C:\Users\MyName\chromedriver", options=chrome_options)
url = "https://www.fda.gov/inspections-compliance-enforcement-and-criminal-investigations/compliance-actions-and-activities/warning-letters"
driver.get(url)
table_confirm = WebDriverWait(driver, 20).until(
ec.presence_of_element_located((By.ID, "datatable"))
)
page_source = driver.page_source
soup = BeautifulSoup(page_source,'lxml')
print(soup)
data = []
table = soup.find('table', {'class':'lcds-datatable table table-bordered cols-8 responsive-enabled dataTable no-footer dtr-inline collapsed'})
table_body = table.find('tbody')
rows = table_body.find_all('tr')
for row in rows:
cols = row.find_all('td')
cols = [ele.text.strip() for ele in cols]
data.append([ele for ele in cols if ele])
有人可以帮帮我吗?谢谢。
[1]: https://i.stack.imgur.com/RUsui.png
如果您在浏览器中查看页面,并在浏览页面时使用浏览器的开发人员工具记录您的网络流量,您会看到,每次更改页面时,都会出现一个 XHR (XmlHttpRequest) HTTP GET正在向 REST API 发出请求,其响应为 JSON 并包含您尝试抓取的所有信息。这个 JSON 然后通常被解析 - 并用于使用 JavaScript.
异步填充 DOM
要获得您正在寻找的数据,您所要做的就是模仿该请求。 Selenium 对此有点矫枉过正——您只需要 requests
。您甚至可以稍微调整请求以满足您的需要。比如默认情况下,页面发起的请求只会抓取后面的10results/entries。我把要求改为一次抢100个,不过这里真的没有优劣之分
def make_pretty(entry):
import re
pattern = ">([^<]*)<"
return {
"posted_date": re.search(pattern, entry[0]).group(1),
"letter_issue_date": re.search(pattern, entry[1]).group(1),
"company_name": re.search(pattern, entry[2]).group(1),
"issuing_office": entry[3],
"subject": entry[4],
"response_letter": entry[5],
"closeout_letter": entry[6]
}
def get_entries():
import requests
from itertools import count
url = "https://www.fda.gov/datatables/views/ajax"
group_length = 100
params = {
"length": group_length,
"view_display_id": "warning_letter_solr_block",
"view_name": "warning_letter_solr_index",
}
headers = {
"user-agent": "Mozilla/5.0",
"x-requested-with": "XMLHttpRequest"
}
for current_group in count(0):
start = current_group * group_length
end = ((current_group + 1) * group_length) - 1
params["start"] = start
response = requests.get(url, params=params, headers=headers)
response.raise_for_status()
data = response.json()
if not data["data"]:
break
yield from map(make_pretty, data["data"])
print("yielding {}-{}".format(start, min(end, data["recordsFiltered"])))
def main():
global all_entries
all_entries = list(get_entries())
print("Total number of entries: {}".format(len(all_entries)))
return 0
if __name__ == "__main__":
import sys
sys.exit(main())
输出:
yielding 0-99
yielding 100-199
yielding 200-299
yielding 300-399
yielding 400-499
yielding 500-599
yielding 600-699
yielding 700-799
yielding 800-899
yielding 900-999
yielding 1000-1099
yielding 1100-1199
yielding 1200-1299
yielding 1300-1399
yielding 1400-1499
yielding 1500-1599
yielding 1600-1699
yielding 1700-1799
yielding 1800-1899
yielding 1900-1999
yielding 2000-2099
yielding 2100-2199
yielding 2200-2299
yielding 2300-2399
yielding 2400-2499
yielding 2500-2599
yielding 2600-2658
Total number of entries: 2658
all_entries[0]
{'posted_date': '11/10/2021', 'letter_issue_date': '11/10/2021', 'company_name': 'Wyoming Vapor Company', 'issuing_office': 'Center for Tobacco Products', 'subject': 'Family Smoking Prevention and Tobacco Control Act/Adulterated/Misbranded', 'response_letter': '', 'closeout_letter': ''}
get_entries
是一个生成器,它向 REST API 发出请求并生成单个条目,直到没有更多条目为止。
make_pretty
用于使我们在 get_entries
中生成的各个条目“漂亮”。从我们收到的 JSON 中,每个“条目”对应一个字符串列表,其中一些字符串是 HTML。 make_pretty
只是天真地解析每个条目中的那些 HTML 字符串,returns 一个包含每个条目的键值对的字典,使用起来更干净。
main
是脚本的主要入口点。我们调用 get_entries
并使用生成器中的所有项目,让条目累积在 all_entries
列表中。我只添加了 global all_entries
行,这样我就可以在脚本结束后玩弄并检查 Python shell 中的 all_entries
- 这不是必需的.
看看 我发布了一个类似的问题,我在其中更深入地介绍了使用浏览器的开发人员工具、记录网络流量、查找和模拟 XHR 请求以及如何检查响应。
编辑:这是更新后的代码:
keys = (
"posted_date", # entry[0]
"letter_issue_date", # entry[1]
"company_name", # entry[2]
"company_url", # entry[2]
"issuing_office", # entry[3]
"subject", # entry[4]
"response_letter_url", # entry[5]
"closeout_letter_url" # entry[6]
)
def make_pretty(entry):
from bs4 import BeautifulSoup as Soup
import re
pattern = "[^<]*"
return dict(zip(keys, [
Soup(entry[0], "html.parser").text.strip(),
Soup(entry[1], "html.parser").text.strip(),
Soup(entry[2], "html.parser").text.strip(),
entry[2] and "https://www.fda.gov" + Soup(entry[2], "html.parser").find("a")["href"],
entry[3].strip(),
re.search(pattern, entry[4]).group(),
entry[5] and "https://www.fda.gov" + Soup(entry[5], "html.parser").find("a")["href"],
entry[6] and "https://www.fda.gov" + Soup(entry[6], "html.parser").find("a")["href"]
]))
def get_entries():
import requests
from itertools import count
url = "https://www.fda.gov/datatables/views/ajax"
group_length = 100
params = {
"length": group_length,
"view_display_id": "warning_letter_solr_block",
"view_name": "warning_letter_solr_index",
}
headers = {
"user-agent": "Mozilla/5.0",
"x-requested-with": "XMLHttpRequest"
}
for current_group in count(0):
start = current_group * group_length
end = ((current_group + 1) * group_length) - 1
params["start"] = start
response = requests.get(url, params=params, headers=headers)
response.raise_for_status()
data = response.json()
if not data["data"]:
break
yield from map(make_pretty, data["data"])
print("yielding {}-{}".format(start, min(end, data["recordsFiltered"])))
def main():
import csv
with open("output.csv", "w", newline="", encoding="utf-8") as file:
writer = csv.DictWriter(file, fieldnames=keys, quoting=csv.QUOTE_ALL)
writer.writeheader()
writer.writerows(get_entries())
print("Done writing.")
return 0
if __name__ == "__main__":
import sys
sys.exit(main())
我是初学者 Python 程序员,试图抓取具有分页的动态 table(数据table)。有分别索引为“0”和“1”的“第一个”和“上一个”分页按钮,然后是编号按钮(见附图),所以我想从索引为“2”的按钮 1 开始,然后遍历页面直到我捕获完整的 table 并且所有链接都完好无损。
<a href="#" aria-controls="datatable" data-dt-idx="2" tabindex="0">1</a>
我设法抓取了前十 table 行的信息,但不知道如何继续抓取其余页面。我想我需要以某种方式遍历那些分页按钮(?)在阅读了无数教程和 Whosebug 问题并观看了几个 Youtube 视频之后,我设法拼凑了以下代码。但是,我最终得到了整个站点的 html,而不仅仅是我的 table,并且只检索了第一页上 table 的前 10 行。
from bs4 import BeautifulSoup
import pandas as pd
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as ec
from selenium.webdriver.chrome.options import Options
chrome_options = Options()
chrome_options.add_argument('--headless')
driver = webdriver.Chrome(r"C:\Users\MyName\chromedriver", options=chrome_options)
url = "https://www.fda.gov/inspections-compliance-enforcement-and-criminal-investigations/compliance-actions-and-activities/warning-letters"
driver.get(url)
table_confirm = WebDriverWait(driver, 20).until(
ec.presence_of_element_located((By.ID, "datatable"))
)
page_source = driver.page_source
soup = BeautifulSoup(page_source,'lxml')
print(soup)
data = []
table = soup.find('table', {'class':'lcds-datatable table table-bordered cols-8 responsive-enabled dataTable no-footer dtr-inline collapsed'})
table_body = table.find('tbody')
rows = table_body.find_all('tr')
for row in rows:
cols = row.find_all('td')
cols = [ele.text.strip() for ele in cols]
data.append([ele for ele in cols if ele])
有人可以帮帮我吗?谢谢。 [1]: https://i.stack.imgur.com/RUsui.png
如果您在浏览器中查看页面,并在浏览页面时使用浏览器的开发人员工具记录您的网络流量,您会看到,每次更改页面时,都会出现一个 XHR (XmlHttpRequest) HTTP GET正在向 REST API 发出请求,其响应为 JSON 并包含您尝试抓取的所有信息。这个 JSON 然后通常被解析 - 并用于使用 JavaScript.
异步填充 DOM要获得您正在寻找的数据,您所要做的就是模仿该请求。 Selenium 对此有点矫枉过正——您只需要 requests
。您甚至可以稍微调整请求以满足您的需要。比如默认情况下,页面发起的请求只会抓取后面的10results/entries。我把要求改为一次抢100个,不过这里真的没有优劣之分
def make_pretty(entry):
import re
pattern = ">([^<]*)<"
return {
"posted_date": re.search(pattern, entry[0]).group(1),
"letter_issue_date": re.search(pattern, entry[1]).group(1),
"company_name": re.search(pattern, entry[2]).group(1),
"issuing_office": entry[3],
"subject": entry[4],
"response_letter": entry[5],
"closeout_letter": entry[6]
}
def get_entries():
import requests
from itertools import count
url = "https://www.fda.gov/datatables/views/ajax"
group_length = 100
params = {
"length": group_length,
"view_display_id": "warning_letter_solr_block",
"view_name": "warning_letter_solr_index",
}
headers = {
"user-agent": "Mozilla/5.0",
"x-requested-with": "XMLHttpRequest"
}
for current_group in count(0):
start = current_group * group_length
end = ((current_group + 1) * group_length) - 1
params["start"] = start
response = requests.get(url, params=params, headers=headers)
response.raise_for_status()
data = response.json()
if not data["data"]:
break
yield from map(make_pretty, data["data"])
print("yielding {}-{}".format(start, min(end, data["recordsFiltered"])))
def main():
global all_entries
all_entries = list(get_entries())
print("Total number of entries: {}".format(len(all_entries)))
return 0
if __name__ == "__main__":
import sys
sys.exit(main())
输出:
yielding 0-99
yielding 100-199
yielding 200-299
yielding 300-399
yielding 400-499
yielding 500-599
yielding 600-699
yielding 700-799
yielding 800-899
yielding 900-999
yielding 1000-1099
yielding 1100-1199
yielding 1200-1299
yielding 1300-1399
yielding 1400-1499
yielding 1500-1599
yielding 1600-1699
yielding 1700-1799
yielding 1800-1899
yielding 1900-1999
yielding 2000-2099
yielding 2100-2199
yielding 2200-2299
yielding 2300-2399
yielding 2400-2499
yielding 2500-2599
yielding 2600-2658
Total number of entries: 2658
all_entries[0]
{'posted_date': '11/10/2021', 'letter_issue_date': '11/10/2021', 'company_name': 'Wyoming Vapor Company', 'issuing_office': 'Center for Tobacco Products', 'subject': 'Family Smoking Prevention and Tobacco Control Act/Adulterated/Misbranded', 'response_letter': '', 'closeout_letter': ''}
get_entries
是一个生成器,它向 REST API 发出请求并生成单个条目,直到没有更多条目为止。
make_pretty
用于使我们在 get_entries
中生成的各个条目“漂亮”。从我们收到的 JSON 中,每个“条目”对应一个字符串列表,其中一些字符串是 HTML。 make_pretty
只是天真地解析每个条目中的那些 HTML 字符串,returns 一个包含每个条目的键值对的字典,使用起来更干净。
main
是脚本的主要入口点。我们调用 get_entries
并使用生成器中的所有项目,让条目累积在 all_entries
列表中。我只添加了 global all_entries
行,这样我就可以在脚本结束后玩弄并检查 Python shell 中的 all_entries
- 这不是必需的.
看看
编辑:这是更新后的代码:
keys = (
"posted_date", # entry[0]
"letter_issue_date", # entry[1]
"company_name", # entry[2]
"company_url", # entry[2]
"issuing_office", # entry[3]
"subject", # entry[4]
"response_letter_url", # entry[5]
"closeout_letter_url" # entry[6]
)
def make_pretty(entry):
from bs4 import BeautifulSoup as Soup
import re
pattern = "[^<]*"
return dict(zip(keys, [
Soup(entry[0], "html.parser").text.strip(),
Soup(entry[1], "html.parser").text.strip(),
Soup(entry[2], "html.parser").text.strip(),
entry[2] and "https://www.fda.gov" + Soup(entry[2], "html.parser").find("a")["href"],
entry[3].strip(),
re.search(pattern, entry[4]).group(),
entry[5] and "https://www.fda.gov" + Soup(entry[5], "html.parser").find("a")["href"],
entry[6] and "https://www.fda.gov" + Soup(entry[6], "html.parser").find("a")["href"]
]))
def get_entries():
import requests
from itertools import count
url = "https://www.fda.gov/datatables/views/ajax"
group_length = 100
params = {
"length": group_length,
"view_display_id": "warning_letter_solr_block",
"view_name": "warning_letter_solr_index",
}
headers = {
"user-agent": "Mozilla/5.0",
"x-requested-with": "XMLHttpRequest"
}
for current_group in count(0):
start = current_group * group_length
end = ((current_group + 1) * group_length) - 1
params["start"] = start
response = requests.get(url, params=params, headers=headers)
response.raise_for_status()
data = response.json()
if not data["data"]:
break
yield from map(make_pretty, data["data"])
print("yielding {}-{}".format(start, min(end, data["recordsFiltered"])))
def main():
import csv
with open("output.csv", "w", newline="", encoding="utf-8") as file:
writer = csv.DictWriter(file, fieldnames=keys, quoting=csv.QUOTE_ALL)
writer.writeheader()
writer.writerows(get_entries())
print("Done writing.")
return 0
if __name__ == "__main__":
import sys
sys.exit(main())