Batch/Bulk SQL 插入 Scrapy 管道 [PostgreSQL]
Batch/Bulk SQL insert in Scrapy Pipelines [PostgreSQL]
我正在使用自己的管道将报废的项目存储到 PostgreSQL 数据库中,几天前我进行了扩展,现在将数据存储到 3 个数据库中。所以,我想让管道每 100 个项目调用一次插入数据的管道,或者它获取项目并将它们 100 x 100 插入。
我想让它在数据库服务器上变得更快和更少头痛的原因。
这不是一个 scrapy 选项,而是一个用于批量插入行的 psycopg2 选项。
这个问题有你可以使用的选项:
Psycopg2, Postgresql, Python: Fastest way to bulk-insert
我不知道 scrapy 是否有任何内置的队列功能,但也许你可以将你的查询推送到 standard python Queue from scrapy and then have a consumer that monitors the queue and as soon as there are a 100 items on it, execute them all, which can indeed be done by psycopg2 (see psycopg2: insert multiple rows with one query).
你可以做类似
queryQueue = Queue()
def queryConsumer(){
while True:
if queryQueue.qsize()==100:
queries=[queryQueue.get() for i in range(100)]
#execute the 100 queries
}
t = Thread(target=queryConsumer)
t.daemon = True
t.start()
从你的 scrapy 方法你可以调用
queryQueue.put(myquery)
将项目推送到队列中。
我的建议是,
在您的蜘蛛本身中,您可以做到这一点,根本不需要在管道中编写。
使用 psycopg2 在 spider 中创建一个 postgres 数据库连接。
psycopg2.connect(database="testdb", user="postgres", password="cohondob", host="127.0.0.1", port="5432")
connection.cursor()
在解析函数中创建一个元组并将报废的项目附加到该列表。
如果达到 100,则插入到数据库中,然后将其提交到数据库中。
例如:
x = ({"name":"q", "lname":"55"},
{"name":"e", "lname":"hh"},
{"name":"ee", "lname":"hh"})
cur.executemany("""INSERT INTO bar(name,lname) VALUES (%(name)s, %(lname)s)""", x)
解决方案与 Anandhakumar 的回答没有什么不同
我在设置文件中创建了一个全局列表,其中包含 setter 和 getter 方法
# This buffer for the bluk insertion
global products_buffer
products_buffer = []
# Append product to the list
def add_to_products_buffer(product):
global products_buffer
products_buffer.append(product)
# Get the length of the product
def get_products_buffer_len():
global products_buffer
return len(products_buffer)
# Get the products list
def get_products_buffer():
global products_buffer
return products_buffer
# Empty the list
def empty_products_buffer():
global products_buffer
products_buffer[:] = []
然后我在管道中导入了它
from project.settings import products_buffer,add_to_products_buffer,get_products_buffer_len,empty_products_buffer,get_products_buffer
每次调用管道时,我都会将项目附加到列表中,并检查列表的长度是否为 100 我在列表上循环以准备许多插入请求,但最重要的魔法是提交它们全部在一行中,不要在循环中提交,否则您将一无所获,并且将它们全部插入将花费很长时间。
def process_item(self, item, spider):
# Adding the item to the list
add_to_products_buffer(item)
# Check if the length is 100
if get_products_buffer_len() == 100:
# Get The list to loop on it
products_list = get_products_buffer()
for item in products_list:
# The insert query
self.cursor.execute('insert query')
try:
# Commit to DB the insertions quires
self.conn.commit()
# Emty the list
empty_products_buffer()
except Exception, e:
# Except the error
如果不想循环也可以使用executemany
我刚刚写了一个小的 scrapy 扩展来将抓取的项目保存到数据库中。 scrapy-sqlitem
超级好用。
pip install scrapy_sqlitem
使用 SqlAlchemy 表定义 Scrapy 项目
from scrapy_sqlitem import SqlItem
class MyItem(SqlItem):
sqlmodel = Table('mytable', metadata
Column('id', Integer, primary_key=True),
Column('name', String, nullable=False))
编写你的爬虫并继承自 SqlSpider
from scrapy_sqlitem import SqlSpider
class MySpider(SqlSpider):
name = 'myspider'
start_urls = ('http://dmoz.org',)
def parse(self, response):
selector = Selector(response)
item = MyItem()
item['name'] = selector.xpath('//title[1]/text()').extract_first()
yield item
将 DATABASE_URI 和块大小设置添加到 settings.py。
DATABASE_URI = "postgresql:///mydb"
DEFAULT_CHUNKSIZE = 100
CHUNKSIZE_BY_TABLE = {'mytable': 100, 'othertable': 250}
创建表格,大功告成!
http://doc.scrapy.org/en/1.0/topics/item-pipeline.html#activating-an-item-pipeline-component
http://docs.sqlalchemy.org/en/rel_1_1/core/tutorial.html#define-and-create-tables
我正在使用自己的管道将报废的项目存储到 PostgreSQL 数据库中,几天前我进行了扩展,现在将数据存储到 3 个数据库中。所以,我想让管道每 100 个项目调用一次插入数据的管道,或者它获取项目并将它们 100 x 100 插入。
我想让它在数据库服务器上变得更快和更少头痛的原因。
这不是一个 scrapy 选项,而是一个用于批量插入行的 psycopg2 选项。 这个问题有你可以使用的选项: Psycopg2, Postgresql, Python: Fastest way to bulk-insert
我不知道 scrapy 是否有任何内置的队列功能,但也许你可以将你的查询推送到 standard python Queue from scrapy and then have a consumer that monitors the queue and as soon as there are a 100 items on it, execute them all, which can indeed be done by psycopg2 (see psycopg2: insert multiple rows with one query).
你可以做类似
queryQueue = Queue()
def queryConsumer(){
while True:
if queryQueue.qsize()==100:
queries=[queryQueue.get() for i in range(100)]
#execute the 100 queries
}
t = Thread(target=queryConsumer)
t.daemon = True
t.start()
从你的 scrapy 方法你可以调用
queryQueue.put(myquery)
将项目推送到队列中。
我的建议是,
在您的蜘蛛本身中,您可以做到这一点,根本不需要在管道中编写。
使用 psycopg2 在 spider 中创建一个 postgres 数据库连接。
psycopg2.connect(database="testdb", user="postgres", password="cohondob", host="127.0.0.1", port="5432")
connection.cursor()
在解析函数中创建一个元组并将报废的项目附加到该列表。
如果达到 100,则插入到数据库中,然后将其提交到数据库中。
例如:
x = ({"name":"q", "lname":"55"},
{"name":"e", "lname":"hh"},
{"name":"ee", "lname":"hh"})
cur.executemany("""INSERT INTO bar(name,lname) VALUES (%(name)s, %(lname)s)""", x)
解决方案与 Anandhakumar 的回答没有什么不同 我在设置文件中创建了一个全局列表,其中包含 setter 和 getter 方法
# This buffer for the bluk insertion
global products_buffer
products_buffer = []
# Append product to the list
def add_to_products_buffer(product):
global products_buffer
products_buffer.append(product)
# Get the length of the product
def get_products_buffer_len():
global products_buffer
return len(products_buffer)
# Get the products list
def get_products_buffer():
global products_buffer
return products_buffer
# Empty the list
def empty_products_buffer():
global products_buffer
products_buffer[:] = []
然后我在管道中导入了它
from project.settings import products_buffer,add_to_products_buffer,get_products_buffer_len,empty_products_buffer,get_products_buffer
每次调用管道时,我都会将项目附加到列表中,并检查列表的长度是否为 100 我在列表上循环以准备许多插入请求,但最重要的魔法是提交它们全部在一行中,不要在循环中提交,否则您将一无所获,并且将它们全部插入将花费很长时间。
def process_item(self, item, spider):
# Adding the item to the list
add_to_products_buffer(item)
# Check if the length is 100
if get_products_buffer_len() == 100:
# Get The list to loop on it
products_list = get_products_buffer()
for item in products_list:
# The insert query
self.cursor.execute('insert query')
try:
# Commit to DB the insertions quires
self.conn.commit()
# Emty the list
empty_products_buffer()
except Exception, e:
# Except the error
如果不想循环也可以使用executemany
我刚刚写了一个小的 scrapy 扩展来将抓取的项目保存到数据库中。 scrapy-sqlitem
超级好用。
pip install scrapy_sqlitem
使用 SqlAlchemy 表定义 Scrapy 项目
from scrapy_sqlitem import SqlItem
class MyItem(SqlItem):
sqlmodel = Table('mytable', metadata
Column('id', Integer, primary_key=True),
Column('name', String, nullable=False))
编写你的爬虫并继承自 SqlSpider
from scrapy_sqlitem import SqlSpider
class MySpider(SqlSpider):
name = 'myspider'
start_urls = ('http://dmoz.org',)
def parse(self, response):
selector = Selector(response)
item = MyItem()
item['name'] = selector.xpath('//title[1]/text()').extract_first()
yield item
将 DATABASE_URI 和块大小设置添加到 settings.py。
DATABASE_URI = "postgresql:///mydb"
DEFAULT_CHUNKSIZE = 100
CHUNKSIZE_BY_TABLE = {'mytable': 100, 'othertable': 250}
创建表格,大功告成!
http://doc.scrapy.org/en/1.0/topics/item-pipeline.html#activating-an-item-pipeline-component
http://docs.sqlalchemy.org/en/rel_1_1/core/tutorial.html#define-and-create-tables