当蜘蛛与 scrapy 和 Rabbitmq (pika) 发生错误时如何重新排队消息
How to requeue the messages when the spider has error with scrapy and Rabbitmq (pika)
我正在尝试使用 pika 和 scrapy 来 运行 MQ,并让消费者调用蜘蛛。我有一个 consumer.py
和一个 scrapy 蜘蛛 spider.py
。
蜘蛛运行正在使用生产者发送的参数进入消费者。我用
used_channel.basic_ack(delivery_tag=basic_deliver.delivery_tag)
删除消息。
我预计消息会在蜘蛛完成作业时被删除,如果出现错误,消息应该重新排队。当蜘蛛 运行 正常时,一切看起来都很好;消息被删除,工作完成。但是,如果在 运行 抓取蜘蛛时发生错误,消息仍然会被删除,工作不会完成,但消息会丢失。
看了Rabbitmq的管理UI,发现爬虫还在运行ning的时候消息变成了0(控制台还没有显示job已经完成)
不知道是不是因为scrapy是异步的?因此,当这一行 run_spider(message=decodebody)
仍在 运行 时,下一行 used_channel.basic_ack(delivery_tag=basic_deliver.delivery_tag)
不会等到蜘蛛完成。
我该如何解决这个问题?我想在蜘蛛正确完成工作后删除消息。
from scrapy.utils.project import get_project_settings
setup() # for CrawlerRunner
settings = get_project_settings()
def get_message(used_channel, basic_deliver, properties, body):
decodebody = bytes.decode(body)
try:
run_spider(message=decodebody)
used_channel.basic_ack(delivery_tag=basic_deliver.delivery_tag)
except:
channel.basic_reject(delivery_tag=basic_deliver.delivery_tag)
def run_spider(message):
crawler = CrawlerRunner(settings)
crawler.crawl(MySpider, message=message)
while(True):
try:
# blocking connection
connection = pika.BlockingConnection(pika.ConnectionParameters(host=rabbit_host))
channel = connection.channel()
# declare exchange, the setting must be same as producer
channel.exchange_declare(
exchange=rabbit_exchange,
exchange_type='direct',
durable=True,
auto_delete=False
)
# declare queue, the setting must be same as producer
channel.queue_declare(
queue=rabbit_queue,
durable=True,
exclusive=False,
auto_delete=False
)
# bind the setting
channel.queue_bind(
exchange=rabbit_exchange,
queue=rabbit_queue,
routing_key=routing_key
)
channel.basic_qos(prefetch_count=1)
channel.basic_consume(
queue=rabbit_queue,
on_message_callback=get_message,
auto_ack=False
)
logger.info(' [*] Waiting for messages. To exit press CTRL+C')
# start crawler
channel.start_consuming()
except pika.exceptions.ConnectionClosed as err:
print('ConnectionClosed error:', err)
continue
# Do not recover on channel errors
except pika.exceptions.AMQPChannelError as err:
print("Caught a channel error: {}, stopping...".format(err))
break
# Recover on all other connection errors
except pika.exceptions.AMQPConnectionError as err:
print("Connection was closed, retrying...", err)
continue
我发现有人用 MQ 为 pika 库处理多线程。他使用 .is_alive
检查线程是否完成。所以,我遵循这个想法。 Scrapy 是多线程的,我添加了 return crawler
,并在删除消息之前检查 crawler._active
。
def run_spider(news_info):
# run spider with CrawlerRunner
crawler = CrawlerRunner(settings)
# run the spider script
crawler.crawl(UrlSpider, news_info=news_info)
return crawler
crawler = run_spider(news_info=decodebody)
# wait until the crawler is done
while (len(crawler._active) > 0):
time.sleep(1)
used_channel.basic_ack(delivery_tag=basic_deliver.delivery_tag)
我正在尝试使用 pika 和 scrapy 来 运行 MQ,并让消费者调用蜘蛛。我有一个 consumer.py
和一个 scrapy 蜘蛛 spider.py
。
蜘蛛运行正在使用生产者发送的参数进入消费者。我用
used_channel.basic_ack(delivery_tag=basic_deliver.delivery_tag)
删除消息。
我预计消息会在蜘蛛完成作业时被删除,如果出现错误,消息应该重新排队。当蜘蛛 运行 正常时,一切看起来都很好;消息被删除,工作完成。但是,如果在 运行 抓取蜘蛛时发生错误,消息仍然会被删除,工作不会完成,但消息会丢失。
看了Rabbitmq的管理UI,发现爬虫还在运行ning的时候消息变成了0(控制台还没有显示job已经完成)
不知道是不是因为scrapy是异步的?因此,当这一行 run_spider(message=decodebody)
仍在 运行 时,下一行 used_channel.basic_ack(delivery_tag=basic_deliver.delivery_tag)
不会等到蜘蛛完成。
我该如何解决这个问题?我想在蜘蛛正确完成工作后删除消息。
from scrapy.utils.project import get_project_settings
setup() # for CrawlerRunner
settings = get_project_settings()
def get_message(used_channel, basic_deliver, properties, body):
decodebody = bytes.decode(body)
try:
run_spider(message=decodebody)
used_channel.basic_ack(delivery_tag=basic_deliver.delivery_tag)
except:
channel.basic_reject(delivery_tag=basic_deliver.delivery_tag)
def run_spider(message):
crawler = CrawlerRunner(settings)
crawler.crawl(MySpider, message=message)
while(True):
try:
# blocking connection
connection = pika.BlockingConnection(pika.ConnectionParameters(host=rabbit_host))
channel = connection.channel()
# declare exchange, the setting must be same as producer
channel.exchange_declare(
exchange=rabbit_exchange,
exchange_type='direct',
durable=True,
auto_delete=False
)
# declare queue, the setting must be same as producer
channel.queue_declare(
queue=rabbit_queue,
durable=True,
exclusive=False,
auto_delete=False
)
# bind the setting
channel.queue_bind(
exchange=rabbit_exchange,
queue=rabbit_queue,
routing_key=routing_key
)
channel.basic_qos(prefetch_count=1)
channel.basic_consume(
queue=rabbit_queue,
on_message_callback=get_message,
auto_ack=False
)
logger.info(' [*] Waiting for messages. To exit press CTRL+C')
# start crawler
channel.start_consuming()
except pika.exceptions.ConnectionClosed as err:
print('ConnectionClosed error:', err)
continue
# Do not recover on channel errors
except pika.exceptions.AMQPChannelError as err:
print("Caught a channel error: {}, stopping...".format(err))
break
# Recover on all other connection errors
except pika.exceptions.AMQPConnectionError as err:
print("Connection was closed, retrying...", err)
continue
我发现有人用 MQ 为 pika 库处理多线程。他使用 .is_alive
检查线程是否完成。所以,我遵循这个想法。 Scrapy 是多线程的,我添加了 return crawler
,并在删除消息之前检查 crawler._active
。
def run_spider(news_info):
# run spider with CrawlerRunner
crawler = CrawlerRunner(settings)
# run the spider script
crawler.crawl(UrlSpider, news_info=news_info)
return crawler
crawler = run_spider(news_info=decodebody)
# wait until the crawler is done
while (len(crawler._active) > 0):
time.sleep(1)
used_channel.basic_ack(delivery_tag=basic_deliver.delivery_tag)