为什么我的管道 return 以前修改过的项目?
Why my pipeline return previous modified items?
我创建了一个管道来保存 ElasticSearch 上的每个项目。在此管道上,我检查项目是否已存在以检查管理员是否覆盖某些字段,以强制重新索引(获取此字段并 save/override 它在新项目上)
class InsertItemOnElasticSearch(object):
buffer = []
@classmethod
def from_crawler( cls, crawler ):
# Init ES connection
# Get uniq ID
def got_id( self, item ):
# Return uniq ID of item
# Check if insert or update
def check_item( self, item ):
item_id = self.got_id( item )
type = 'create'
is_exist = self.es.search(...)
if is_exist[ 'hits' ][ 'total' ] == 1:
type = 'index'
item_tmp = is_exist[ 'hits' ][ 'hits' ][0][ '_source' ]
is_override_by_admin = item_tmp.get( 'is_override_by_admin', False )
if is_override_by_admin:
...
try:
my_field = item_tmp.get( 'my_field' )
if my_field:
item[ 'my_field' ] = my_field
except:
pass
return self.index_item( item, type, item_id )
# Format indexation
def index_item( self, item, op_type, item_id ):
# Add es_action to buffer
# Send buffer to ES
def save_items( self ):
helpers.bulk( self.es, self.buffer )
# Process item send to pipelines
def process_item( self, item, spider ):
return self.check_item( item )
# Send buffer when spider closed
def close_spider( self, spider ):
if len( self.buffer ):
self.save_items()
但是,当一个产品存在并且有 my_field 填充时,脚本会在所有下一个项目上保存相同的内容,尽管这个字段不存在。所以我所有的数据都损坏了..
有人知道为什么吗?
我修好了。这是因为在蜘蛛上我打电话
for resp in self.gotVariantsListing(): # On product page
...
yield resp
...
def gotVariantsListing(): # To get all variants of product
...
for x in i:
yield Request( MyURL, self.ajaxFunction, meta={ 'item': item }, ... )
...
def ajaxFunction(): # To got variant specific attributs
item = response.meta[ 'item' ]
...
yield item
所以我更改为 meta={ 'item': item.copy() }
,现在一切正常。感谢记得我.copy().. ;-)
我们不更改爬虫进程,因此项目引用对于所有 ajaxFunction 调用都是相同的。为此,管道上的所有更改在蜘蛛进程上都是可读的
我创建了一个管道来保存 ElasticSearch 上的每个项目。在此管道上,我检查项目是否已存在以检查管理员是否覆盖某些字段,以强制重新索引(获取此字段并 save/override 它在新项目上)
class InsertItemOnElasticSearch(object):
buffer = []
@classmethod
def from_crawler( cls, crawler ):
# Init ES connection
# Get uniq ID
def got_id( self, item ):
# Return uniq ID of item
# Check if insert or update
def check_item( self, item ):
item_id = self.got_id( item )
type = 'create'
is_exist = self.es.search(...)
if is_exist[ 'hits' ][ 'total' ] == 1:
type = 'index'
item_tmp = is_exist[ 'hits' ][ 'hits' ][0][ '_source' ]
is_override_by_admin = item_tmp.get( 'is_override_by_admin', False )
if is_override_by_admin:
...
try:
my_field = item_tmp.get( 'my_field' )
if my_field:
item[ 'my_field' ] = my_field
except:
pass
return self.index_item( item, type, item_id )
# Format indexation
def index_item( self, item, op_type, item_id ):
# Add es_action to buffer
# Send buffer to ES
def save_items( self ):
helpers.bulk( self.es, self.buffer )
# Process item send to pipelines
def process_item( self, item, spider ):
return self.check_item( item )
# Send buffer when spider closed
def close_spider( self, spider ):
if len( self.buffer ):
self.save_items()
但是,当一个产品存在并且有 my_field 填充时,脚本会在所有下一个项目上保存相同的内容,尽管这个字段不存在。所以我所有的数据都损坏了..
有人知道为什么吗?
我修好了。这是因为在蜘蛛上我打电话
for resp in self.gotVariantsListing(): # On product page
...
yield resp
...
def gotVariantsListing(): # To get all variants of product
...
for x in i:
yield Request( MyURL, self.ajaxFunction, meta={ 'item': item }, ... )
...
def ajaxFunction(): # To got variant specific attributs
item = response.meta[ 'item' ]
...
yield item
所以我更改为 meta={ 'item': item.copy() }
,现在一切正常。感谢记得我.copy().. ;-)
我们不更改爬虫进程,因此项目引用对于所有 ajaxFunction 调用都是相同的。为此,管道上的所有更改在蜘蛛进程上都是可读的