非阻塞 Scrapy 管道到数据库
Nonblocking Scrapy pipeline to database
我在 Scrapy 中有一个网络抓取工具,可以获取数据项。我也想将它们异步插入到数据库中。
例如,我有一个使用 SQLAlchemy Core 将一些项目插入我的数据库的事务:
def process_item(self, item, spider):
with self.connection.begin() as conn:
conn.execute(insert(table1).values(item['part1'])
conn.execute(insert(table2).values(item['part2'])
我知道可以通过 alchimia
将 SQLAlchemy Core 与 Twisted 异步使用。 alchimia
的文档代码示例如下。
我不明白的是如何在 alchimia 框架中使用我上面的代码。如何设置 process_item
以使用反应器?
我可以做这样的事情吗?
@inlineCallbacks
def process_item(self, item, spider):
with self.connection.begin() as conn:
yield conn.execute(insert(table1).values(item['part1'])
yield conn.execute(insert(table2).values(item['part2'])
reactor部分怎么写?
或者是否有更简单的方法在 Scrapy 管道中进行非阻塞数据库插入?
作为参考,这里是 alchimia
文档中的代码示例:
from alchimia import TWISTED_STRATEGY
from sqlalchemy import (
create_engine, MetaData, Table, Column, Integer, String
)
from sqlalchemy.schema import CreateTable
from twisted.internet.defer import inlineCallbacks
from twisted.internet.task import react
@inlineCallbacks
def main(reactor):
engine = create_engine(
"sqlite://", reactor=reactor, strategy=TWISTED_STRATEGY
)
metadata = MetaData()
users = Table("users", metadata,
Column("id", Integer(), primary_key=True),
Column("name", String()),
)
# Create the table
yield engine.execute(CreateTable(users))
# Insert some users
yield engine.execute(users.insert().values(name="Jeremy Goodwin"))
yield engine.execute(users.insert().values(name="Natalie Hurley"))
yield engine.execute(users.insert().values(name="Dan Rydell"))
yield engine.execute(users.insert().values(name="Casey McCall"))
yield engine.execute(users.insert().values(name="Dana Whitaker"))
result = yield engine.execute(users.select(users.c.name.startswith("D")))
d_users = yield result.fetchall()
# Print out the users
for user in d_users:
print "Username: %s" % user[users.c.name]
if __name__ == "__main__":
react(main, [])
How can I set up process_item to use a reactor?
您不需要管理管道中的另一个反应器。
相反,您可以通过从管道返回延迟来在项目管道内进行异步数据库交互。
另见 Scrapy's doc and sample code doing asynchronous operations within an item pipeline by returning a deferred。
我在 Scrapy 中有一个网络抓取工具,可以获取数据项。我也想将它们异步插入到数据库中。
例如,我有一个使用 SQLAlchemy Core 将一些项目插入我的数据库的事务:
def process_item(self, item, spider):
with self.connection.begin() as conn:
conn.execute(insert(table1).values(item['part1'])
conn.execute(insert(table2).values(item['part2'])
我知道可以通过 alchimia
将 SQLAlchemy Core 与 Twisted 异步使用。 alchimia
的文档代码示例如下。
我不明白的是如何在 alchimia 框架中使用我上面的代码。如何设置 process_item
以使用反应器?
我可以做这样的事情吗?
@inlineCallbacks
def process_item(self, item, spider):
with self.connection.begin() as conn:
yield conn.execute(insert(table1).values(item['part1'])
yield conn.execute(insert(table2).values(item['part2'])
reactor部分怎么写?
或者是否有更简单的方法在 Scrapy 管道中进行非阻塞数据库插入?
作为参考,这里是 alchimia
文档中的代码示例:
from alchimia import TWISTED_STRATEGY
from sqlalchemy import (
create_engine, MetaData, Table, Column, Integer, String
)
from sqlalchemy.schema import CreateTable
from twisted.internet.defer import inlineCallbacks
from twisted.internet.task import react
@inlineCallbacks
def main(reactor):
engine = create_engine(
"sqlite://", reactor=reactor, strategy=TWISTED_STRATEGY
)
metadata = MetaData()
users = Table("users", metadata,
Column("id", Integer(), primary_key=True),
Column("name", String()),
)
# Create the table
yield engine.execute(CreateTable(users))
# Insert some users
yield engine.execute(users.insert().values(name="Jeremy Goodwin"))
yield engine.execute(users.insert().values(name="Natalie Hurley"))
yield engine.execute(users.insert().values(name="Dan Rydell"))
yield engine.execute(users.insert().values(name="Casey McCall"))
yield engine.execute(users.insert().values(name="Dana Whitaker"))
result = yield engine.execute(users.select(users.c.name.startswith("D")))
d_users = yield result.fetchall()
# Print out the users
for user in d_users:
print "Username: %s" % user[users.c.name]
if __name__ == "__main__":
react(main, [])
How can I set up process_item to use a reactor?
您不需要管理管道中的另一个反应器。
相反,您可以通过从管道返回延迟来在项目管道内进行异步数据库交互。
另见 Scrapy's doc and sample code doing asynchronous operations within an item pipeline by returning a deferred。