如何将 Scrapy Web 爬虫与 Luigi 数据管道集成?

How to integrate a Scrapy Web Crawler with a Luigi Data Pipeline?

(老用户&&第一个问题&&紧张的问)是真的

我目前正在构建一个 Python 后端,它将部署到具有以下架构的单个 AWS EC2 实例:


|---- 数据源 -----| 临时存储 | - 数据处理 --- | ----- DB ---- |

网络爬虫数据----*保存到S3* =\
API数据----------------*保存到S3* ==> Lugi数据管道 --> MongoDB


如上所示,我们有不同的获取数据的方式(即 API 请求、Scrapy Web 爬虫等...)但是 tricky/difficult 部分提出了一个简单而错误的方法- 将接收到的数据与 Luigi 数据管道连接的容忍方式。

有没有办法将网络爬虫的输出集成到 Luigi 数据管道中?如果不是,弥合 HTTP 数据获取器和 Luigi 任务之间差距的最佳方法是什么?

如有任何建议、文档或文章,我们将不胜感激!此外,如果您需要更多详细信息,我会尽快将它们提供给您。

谢谢!

我没用过luigi。但我确实使用scrapy。我猜真正的问题是你如何以合理的方式通知 luigi 有新数据要处理?

有一个类似的问题你可以从这里学习:When a new file arrives in S3, trigger luigi task 也许你们在同一个地方工作:).

我强烈建议在 scrapyd 中托管您的蜘蛛并使用 scrapyd-client 来驱动它。如果您尝试 运行 在其他使用扭曲库的工具中进行 scrapy(不确定 luigi 是否这样做),则会弹出各种毛茸茸的东西。我会使用 scrapyd-client 驱动蜘蛛,让你的蜘蛛 post 触发 url 告诉 luigi 以某种方式开始任务。

同样,因为我没有使用过 luigi,所以我不知道那里的细节......但你不想成为 busy-checking/polling 来查看工作是否完成。

我有一个 django 网络应用程序,我启动了蜘蛛程序,存储了来自 scrapyd-client 的 jobid,完成后在肩膀上轻按 json,然后我使用 celery 和 solr 来摄取数据。

编辑以包含以下评论中的管道代码:

        for fentry in item['files']:

            # open and read the file
            pdf = open(rootdir+os.path.sep+fentry['path'],'rb').read()

            # just in case we need cookies
            cj     = CookieJar()
            opener = urllib.request.build_opener(urllib.request.HTTPCookieProcessor(cj))

            # set the content type
            headers = {
            'Content-Type': 'application/json'
            }

            #fill out the object
            json_body = json.dumps({
            'uid'   : 'indeed-'+item['indeed_uid'],
            'url'   : item['candidate_url'],
            'first_name' : fname,
            'last_name'  : lname,
            'pdf'     : base64.b64encode(pdf).decode(),
            'jobid': spider.jobid
            }).encode()
            #, ensure_ascii=False)

            # send the POST and read the result
            request = urllib.request.Request('http://localhost:8080/api/someapi/', json_body, headers)
            request.get_method = lambda: 'POST'
            response = opener.open(request)