ReactorAlreadyRunning Scrapy

ReactorAlreadyRunning Scrapy

我在 python 中使用 scrapy 作为爬虫。我的问题是,我无法并行启动多个爬网作业。

找工作

def getJobs(self):
        mysql = MysqlConnector.Mysql()
        db = mysql.getConnection();
        cur = db.cursor();
        cur.execute("SELECT *  FROM job WHERE status=0 OR days>0")
        print "Get new jobs"
        #JobModel        
        joblist=[]
        for row in cur.fetchall():
            job = JobModel.JobModel();
            job.id = row[0]
            job.user_id = row[1]
            job.name = row[2]
            job.url = row[3]
            job.api = row[4]
            job.max_pages = row[5]
            job.crawl_depth = row[6]
            job.processing_patterns = row[7]
            job.status = row[8]
            job.days = row[9]
            job.ajax=row[11]
            joblist.append(job);

        #Proces the job now
        for job in joblist:
            processJob = ProcessJob.ProcessJob();
            th=Thread(target=processJob.processJob,args=(job,))
            th.daemon=True
            th.start();

            db.close()

处理作业

def processJob(self, job):
        #update job
        mysql = MysqlConnector.Mysql()
        db = mysql.getConnection();
        cur = db.cursor(); 
        job.status = 1
        update = "UPDATE job SET status=1 WHERE id=" + str(job.id)
        cur.execute(update)
        db.commit()
        db.close()

        #Start new crawler
        crawler = spider.MySpider;
        print job.ajax;
        if job.ajax == 1:
            crawler.custom_settings = CrawlerSettings.ajax_settings;
        else:
            crawler.custom_settings = CrawlerSettings.normal_settings;
        configure_logging()
        runner = CrawlerRunner()
        runner.crawl(crawler, job=job)
        d = runner.join()
        d.addBoth(lambda _: reactor.stop())
        reactor.run(0) 

Get Jobs 每 5 秒从数据库中检索一次新的 Jobs 并将它们提供给 processJobs。问题是,当我启动多个 Crawljobs 时,出现以下异常:

    Traceback (most recent call last):
  File "/usr/local/Cellar/python/2.7.9/Frameworks/Python.framework/Versions/2.7/lib/python2.7/threading.py", line 810, in __bootstrap_inner
    self.run()
  File "/usr/local/Cellar/python/2.7.9/Frameworks/Python.framework/Versions/2.7/lib/python2.7/threading.py", line 763, in run
    self.__target(*self.__args, **self.__kwargs)
  File "/Users/fabianlurz/c_crawler/c_crawler/jobs/ProcessJob.py", line 31, in processJob
    reactor.run(0)
  File "/usr/local/lib/python2.7/site-packages/twisted/internet/base.py", line 1193, in run
    self.startRunning(installSignalHandlers=installSignalHandlers)
  File "/usr/local/lib/python2.7/site-packages/twisted/internet/base.py", line 1173, in startRunning
    ReactorBase.startRunning(self)
  File "/usr/local/lib/python2.7/site-packages/twisted/internet/base.py", line 682, in startRunning
    raise error.ReactorAlreadyRunning()

我已经知道我不能启动 reactor 两次 - 但必须有一种方法可以在一个 "server" 上拥有多个爬行实例。 那么我该如何实现呢?

成功了

from billiard import Process
from model import CrawlerSettings
from scrapy.crawler import CrawlerRunner
from scrapy.utils.log import configure_logging
from spiders import spider
from twisted.internet import reactor
from twisted.internet.protocol import Protocol
from utility import MysqlConnector


class ProcessJob():
    def processJob(self, job):
        #update job
        mysql = MysqlConnector.Mysql()
        db = mysql.getConnection();
        cur = db.cursor(); 
        job.status = 1
        update = "UPDATE job SET status=1 WHERE id=" + str(job.id)
        cur.execute(update)
        db.commit()
        db.close()

        #Start new crawler
        configure_logging()
        webspider = spider.MySpider;   
        if job.ajax == 1:
            webspider.custom_settings = CrawlerSettings.ajax_settings;
        else:
            webspider.custom_settings = CrawlerSettings.normal_settings;
        crawler = UrlCrawlerScript(webspider, job)
        crawler.start()


class UrlCrawlerScript(Process):
    def __init__(self, spider, job):
        Process.__init__(self)
        self.crawler = CrawlerRunner()
        self.crawler.crawl(spider, job=job)  

    def run(self):
        d = self.crawler.join()
        d.addBoth(lambda _: reactor.stop())       
        reactor.run(0)

使用 billard 产生多个进程