用于多蜘蛛的 Scrapy Item 管道
Scrapy Item pipeline for multi spiders
我有 2 只蜘蛛,运行 它在这里:
from scrapy.crawler import CrawlerProcess
from scrapy.utils.project import get_project_settings
settings = get_project_settings()
process1 = CrawlerProcess(settings)
process1.crawl('spider1')
process1.crawl('spider2')
process1.start()
我希望这些蜘蛛写一个通用文件。
这是管道 class:
class FilePipeline(object):
def __init__(self):
self.file = codecs.open('data.txt', 'w', encoding='utf-8')
self.spiders = []
def open_spider(self, spider):
self.spiders.append(spider.name)
def process_item(self, item, spider):
line = json.dumps(OrderedDict(item), ensure_ascii=False, sort_keys=False) + "\n"
self.file.write(line)
return item
def spider_closed(self, spider):
self.spiders.remove(spider.name)
if len(self.spiders) == 0:
self.file.close()
但是虽然我没有收到错误消息,但当所有蜘蛛都完成写入公共文件时,我的行(项目)比 scrapy 日志少。剪了几行。也许两个蜘蛛同时在一个文件中写一些练习?
更新:
谢谢大家!)
我是这样实现的:
class FilePipeline1(object):
lock = threading.Lock()
datafile = codecs.open('myfile.txt', 'w', encoding='utf-8')
def __init__(self):
pass
def open_spider(self, spider):
pass
def process_item(self, item, spider):
line = json.dumps(OrderedDict(item), ensure_ascii=False, sort_keys=False) + "\n"
try:
FilePipeline1.lock.acquire()
if isinstance(item, VehicleItem):
FilePipeline1.datafile.write(line)
except:
pass
finally:
FilePipeline1.lock.release()
return item
def spider_closed(self, spider):
pass
您在单独线程中的两个蜘蛛同时写入文件。正如 past 所说, 将 导致一些问题,例如,如果您不注意同步,行会被切断,其中一些会丢失。为此,您需要同步文件访问并只写入整个 record/lines,或者制定将文件区域分配给不同线程的策略,例如重新构建一个已知偏移量和大小的文件,默认情况下你没有这些。一般来说,从两个不同的线程同时写入同一个文件是不常见的方法,除非你真的知道自己在做什么,否则我不建议你这样做。
相反,我会分离蜘蛛 IO 函数,并在开始另一个之前等待一个动作完成 - 考虑到您的线程未同步,这将使程序更高效并使其工作: ) 如果您想要一个代码示例来说明如何在您的上下文中执行此操作,请提出要求,我很乐意提供。
我同意 A. Abramov 的回答。
这只是我的一个想法。您可以在您选择的数据库中创建两个表,然后在两个蜘蛛完成爬网后合并它们。您必须跟踪日志进入的时间,以便您可以根据收到的时间对日志进行排序。然后您可以将数据库转储到您想要的任何文件类型中。这样一来,程序不必等待一个进程完成就可以写入文件,您也不必进行任何多线程编程。
更新:
实际上,根据您的蜘蛛 运行 的长度,您可以将日志输出和时间存储到字典中。时间是键,日志输出是值。这比初始化数据库更容易。然后您可以按键顺序将字典转储到您的文件中。
我有 2 只蜘蛛,运行 它在这里:
from scrapy.crawler import CrawlerProcess
from scrapy.utils.project import get_project_settings
settings = get_project_settings()
process1 = CrawlerProcess(settings)
process1.crawl('spider1')
process1.crawl('spider2')
process1.start()
我希望这些蜘蛛写一个通用文件。
这是管道 class:
class FilePipeline(object):
def __init__(self):
self.file = codecs.open('data.txt', 'w', encoding='utf-8')
self.spiders = []
def open_spider(self, spider):
self.spiders.append(spider.name)
def process_item(self, item, spider):
line = json.dumps(OrderedDict(item), ensure_ascii=False, sort_keys=False) + "\n"
self.file.write(line)
return item
def spider_closed(self, spider):
self.spiders.remove(spider.name)
if len(self.spiders) == 0:
self.file.close()
但是虽然我没有收到错误消息,但当所有蜘蛛都完成写入公共文件时,我的行(项目)比 scrapy 日志少。剪了几行。也许两个蜘蛛同时在一个文件中写一些练习?
更新:
谢谢大家!) 我是这样实现的:
class FilePipeline1(object):
lock = threading.Lock()
datafile = codecs.open('myfile.txt', 'w', encoding='utf-8')
def __init__(self):
pass
def open_spider(self, spider):
pass
def process_item(self, item, spider):
line = json.dumps(OrderedDict(item), ensure_ascii=False, sort_keys=False) + "\n"
try:
FilePipeline1.lock.acquire()
if isinstance(item, VehicleItem):
FilePipeline1.datafile.write(line)
except:
pass
finally:
FilePipeline1.lock.release()
return item
def spider_closed(self, spider):
pass
您在单独线程中的两个蜘蛛同时写入文件。正如 past 所说, 将 导致一些问题,例如,如果您不注意同步,行会被切断,其中一些会丢失。为此,您需要同步文件访问并只写入整个 record/lines,或者制定将文件区域分配给不同线程的策略,例如重新构建一个已知偏移量和大小的文件,默认情况下你没有这些。一般来说,从两个不同的线程同时写入同一个文件是不常见的方法,除非你真的知道自己在做什么,否则我不建议你这样做。
相反,我会分离蜘蛛 IO 函数,并在开始另一个之前等待一个动作完成 - 考虑到您的线程未同步,这将使程序更高效并使其工作: ) 如果您想要一个代码示例来说明如何在您的上下文中执行此操作,请提出要求,我很乐意提供。
我同意 A. Abramov 的回答。
这只是我的一个想法。您可以在您选择的数据库中创建两个表,然后在两个蜘蛛完成爬网后合并它们。您必须跟踪日志进入的时间,以便您可以根据收到的时间对日志进行排序。然后您可以将数据库转储到您想要的任何文件类型中。这样一来,程序不必等待一个进程完成就可以写入文件,您也不必进行任何多线程编程。
更新:
实际上,根据您的蜘蛛 运行 的长度,您可以将日志输出和时间存储到字典中。时间是键,日志输出是值。这比初始化数据库更容易。然后您可以按键顺序将字典转储到您的文件中。