Logstash 没有将数据迁移到 Elasticsearch
Logstash is not migrating data to Elasticsearch
我有一堆 CSV 文件需要迁移到 Elastichsearch,我设法使用了 Logstash 版本 7.16.3,索引已经使用正确的映射在 elastichsearch 上创建。配置文件如下:
input{
file{
path=> "C:/Users/fr-pa/Documents/wikidata/extracted/*.csv"
start_position => "beginning"
sincedb_path => "NULL"
} } filter{
csv{
separator => ","
columns =>["id", "type", "arlabel", "enlabel","araliases",
"enaliases","ardescription","endescription","maincategory",
"arwiki", "enwiki","arwikiquote", "enwikiquote"]
} } output{
elasticsearch{
hosts=> "http://localhost:9200/"
index => "wikidata_index"
}
stdout {
} }
但是数据没有迁移,Logstash的输出:
有人知道问题出在哪里吗?
这是我的索引
索引设置
request_body = {
"settings": {
"analysis": {
"filter": {
"arabic_stop": {
"type": "stop",
"stopwords": "_arabic_"
},
"arabic_keywords": {
"type": "keyword_marker",
"keywords": ["مثال"]
},
"arabic_stemmer": {
"type": "stemmer",
"language": "arabic"
},
"english_stop": {
"type": "stop",
"stopwords": "_english_"
},
"english_keywords": {
"type": "keyword_marker",
"keywords": ["example"]
},
"english_stemmer": {
"type": "stemmer",
"language": "english"
},
"english_possessive_stemmer": {
"type": "stemmer",
"language": "possessive_english"
}
},
"analyzer": {
"rebuilt_arabic": {
"tokenizer": "standard",
"filter": [
"lowercase",
"decimal_digit",
"arabic_stop",
"arabic_normalization",
"arabic_keywords",
"arabic_stemmer"
]
},
"comma_split":{
"type" : "pattern",
"pattern" : ","
},
"rebuilt_english": {
"tokenizer": "standard",
"filter": [
"english_possessive_stemmer",
"lowercase",
"english_stop",
"english_keywords",
"english_stemmer"
]
}
}
}
} ,
"mappings": {
"properties": {
"id": {
"type": "keyword",
"ignore_above": 256
},
"type": {
"type": "text",
"analyzer": "comma_split"
},
"arlabel": {
"type": "text",
"analyzer": "rebuilt_arabic"
},
"enlabel": {
"type": "text",
"analyzer": "rebuilt_english"
},
"araliases": {
"type": "text",
"analyzer": "comma_split"
},
"enaliases": {
"type": "text",
"analyzer": "comma_split"
},
"ardescription":{
"type": "text",
"analyzer": "rebuilt_arabic"
},
"endescription":{
"type": "text",
"analyzer": "rebuilt_english"
},
"maincategory":{
"type": "text",
"analyzer": "comma_split"
},
"arwiki":{
"type": "text",
"analyzer": "rebuilt_arabic"
},
"enwiki":{
"type": "text",
"analyzer": "rebuilt_english"
},
"arwikiquote":{
"type": "text",
"analyzer": "rebuilt_arabic"
},
"enwikiquote": {
"type": "text",
"analyzer": "rebuilt_english"
}
}
}
}
请注意,有些字段的值为空,我尝试使用 python 批量助手 class 来插入数据:
with open(full_path,encoding="utf8") as f:
reader = csv.DictReader(f)
print(reader)
helpers.bulk(es, reader, index='wikidata_index')
引发的错误是:
C:\Users\fr-pa\Documents\wikidata\extracted\till_Q10091689_item.csv
<csv.DictReader object at 0x0000028E86C47EB0>
---------------------------------------------------------------------------
BulkIndexError Traceback (most recent call last)
<ipython-input-42-3849641bd8f9> in <module>
5 reader = csv.DictReader(f)
6 print(reader)
----> 7 helpers.bulk(es, reader, index='wikidata_index')
C:\ProgramData\Anaconda3\lib\site-packages\elasticsearch\helpers\actions.py in bulk(client, actions, stats_only, ignore_status, *args, **kwargs)
408 # make streaming_bulk yield successful results so we can count them
409 kwargs["yield_ok"] = True
--> 410 for ok, item in streaming_bulk(
411 client, actions, ignore_status=ignore_status, *args, **kwargs
412 ):
C:\ProgramData\Anaconda3\lib\site-packages\elasticsearch\helpers\actions.py in streaming_bulk(client, actions, chunk_size, max_chunk_bytes, raise_on_error, expand_action_callback, raise_on_exception, max_retries, initial_backoff, max_backoff, yield_ok, ignore_status, *args, **kwargs)
327
328 try:
--> 329 for data, (ok, info) in zip(
330 bulk_data,
331 _process_bulk_chunk(
C:\ProgramData\Anaconda3\lib\site-packages\elasticsearch\helpers\actions.py in _process_bulk_chunk(client, bulk_actions, bulk_data, raise_on_exception, raise_on_error, ignore_status, *args, **kwargs)
254 raise_on_error=raise_on_error,
255 )
--> 256 for item in gen:
257 yield item
258
C:\ProgramData\Anaconda3\lib\site-packages\elasticsearch\helpers\actions.py in _process_bulk_chunk_success(resp, bulk_data, ignore_status, raise_on_error)
185
186 if errors:
--> 187 raise BulkIndexError("%i document(s) failed to index." % len(errors), errors)
188
189
BulkIndexError: ('500 document(s) failed to index.', [{'index': {'_index': 'wikidata_index', '_type': '_doc', '_id': 'dbxzon4BOVq7OZfct2-t', 'status': 400, 'error': {'type': 'mapper_parsing_exception', 'reason': 'failed to parse', 'caused_by': {'type': 'illegal_argument_exception', 'reason': 'field name cannot be an empty string'}},
从堆栈跟踪的最后一行可以看出问题:
BulkIndexError: ('500 document(s) failed to index.', [{'index':
{'_index': 'wikidata_index', '_type': '_doc', '_id':
'dbxzon4BOVq7OZfct2-t', 'status': 400, 'error': {'type':
'mapper_parsing_exception', 'reason': 'failed to parse', 'caused_by':
{'type': 'illegal_argument_exception', 'reason': 'field name cannot be
an empty string'}},
您需要用空键删除或替换字段,例如 this one。
The solution that worked for :
with open(full_path,encoding="utf8") as f:
reader = csv.DictReader(f)
for row in reader:
if row['type'] not in ['Q4167410', 'Q4167836']:
if row['araliases'] !="<class 'str'>":
araliases = row['araliases']
else:
araliases= None
if row['enaliases'] !="<class 'str'>":
enaliases = row['enaliases']
else:
enaliases= None
if row['maincategory'] !="<class 'str'>":
maincategory = row['maincategory']
else:
maincategory= None
wikidata_item ={
"id":row['id'],
"type":row['type'],
"arlabel":row['arlabel'],
"enlabel":row['enlabel'],
"araliases": araliases,
"enaliases":enaliases,
"ardescription":row['ardescription'],
"endescription":row['endescription'],
"maincategory": maincategory,
"arwiki":row['arwiki'],
"enwiki":row['enwiki'],
"arwikiquote":row['arwikiquote']
}
actions.append(wikidata_item)
#print(actions)
helpers.bulk(es,actions, index='wikidata_index')
我有一堆 CSV 文件需要迁移到 Elastichsearch,我设法使用了 Logstash 版本 7.16.3,索引已经使用正确的映射在 elastichsearch 上创建。配置文件如下:
input{
file{
path=> "C:/Users/fr-pa/Documents/wikidata/extracted/*.csv"
start_position => "beginning"
sincedb_path => "NULL"
} } filter{
csv{
separator => ","
columns =>["id", "type", "arlabel", "enlabel","araliases",
"enaliases","ardescription","endescription","maincategory",
"arwiki", "enwiki","arwikiquote", "enwikiquote"]
} } output{
elasticsearch{
hosts=> "http://localhost:9200/"
index => "wikidata_index"
}
stdout {
} }
但是数据没有迁移,Logstash的输出:
有人知道问题出在哪里吗?
这是我的索引
索引设置
request_body = {
"settings": {
"analysis": {
"filter": {
"arabic_stop": {
"type": "stop",
"stopwords": "_arabic_"
},
"arabic_keywords": {
"type": "keyword_marker",
"keywords": ["مثال"]
},
"arabic_stemmer": {
"type": "stemmer",
"language": "arabic"
},
"english_stop": {
"type": "stop",
"stopwords": "_english_"
},
"english_keywords": {
"type": "keyword_marker",
"keywords": ["example"]
},
"english_stemmer": {
"type": "stemmer",
"language": "english"
},
"english_possessive_stemmer": {
"type": "stemmer",
"language": "possessive_english"
}
},
"analyzer": {
"rebuilt_arabic": {
"tokenizer": "standard",
"filter": [
"lowercase",
"decimal_digit",
"arabic_stop",
"arabic_normalization",
"arabic_keywords",
"arabic_stemmer"
]
},
"comma_split":{
"type" : "pattern",
"pattern" : ","
},
"rebuilt_english": {
"tokenizer": "standard",
"filter": [
"english_possessive_stemmer",
"lowercase",
"english_stop",
"english_keywords",
"english_stemmer"
]
}
}
}
} ,
"mappings": {
"properties": {
"id": {
"type": "keyword",
"ignore_above": 256
},
"type": {
"type": "text",
"analyzer": "comma_split"
},
"arlabel": {
"type": "text",
"analyzer": "rebuilt_arabic"
},
"enlabel": {
"type": "text",
"analyzer": "rebuilt_english"
},
"araliases": {
"type": "text",
"analyzer": "comma_split"
},
"enaliases": {
"type": "text",
"analyzer": "comma_split"
},
"ardescription":{
"type": "text",
"analyzer": "rebuilt_arabic"
},
"endescription":{
"type": "text",
"analyzer": "rebuilt_english"
},
"maincategory":{
"type": "text",
"analyzer": "comma_split"
},
"arwiki":{
"type": "text",
"analyzer": "rebuilt_arabic"
},
"enwiki":{
"type": "text",
"analyzer": "rebuilt_english"
},
"arwikiquote":{
"type": "text",
"analyzer": "rebuilt_arabic"
},
"enwikiquote": {
"type": "text",
"analyzer": "rebuilt_english"
}
}
}
}
请注意,有些字段的值为空,我尝试使用 python 批量助手 class 来插入数据:
with open(full_path,encoding="utf8") as f:
reader = csv.DictReader(f)
print(reader)
helpers.bulk(es, reader, index='wikidata_index')
引发的错误是:
C:\Users\fr-pa\Documents\wikidata\extracted\till_Q10091689_item.csv
<csv.DictReader object at 0x0000028E86C47EB0>
---------------------------------------------------------------------------
BulkIndexError Traceback (most recent call last)
<ipython-input-42-3849641bd8f9> in <module>
5 reader = csv.DictReader(f)
6 print(reader)
----> 7 helpers.bulk(es, reader, index='wikidata_index')
C:\ProgramData\Anaconda3\lib\site-packages\elasticsearch\helpers\actions.py in bulk(client, actions, stats_only, ignore_status, *args, **kwargs)
408 # make streaming_bulk yield successful results so we can count them
409 kwargs["yield_ok"] = True
--> 410 for ok, item in streaming_bulk(
411 client, actions, ignore_status=ignore_status, *args, **kwargs
412 ):
C:\ProgramData\Anaconda3\lib\site-packages\elasticsearch\helpers\actions.py in streaming_bulk(client, actions, chunk_size, max_chunk_bytes, raise_on_error, expand_action_callback, raise_on_exception, max_retries, initial_backoff, max_backoff, yield_ok, ignore_status, *args, **kwargs)
327
328 try:
--> 329 for data, (ok, info) in zip(
330 bulk_data,
331 _process_bulk_chunk(
C:\ProgramData\Anaconda3\lib\site-packages\elasticsearch\helpers\actions.py in _process_bulk_chunk(client, bulk_actions, bulk_data, raise_on_exception, raise_on_error, ignore_status, *args, **kwargs)
254 raise_on_error=raise_on_error,
255 )
--> 256 for item in gen:
257 yield item
258
C:\ProgramData\Anaconda3\lib\site-packages\elasticsearch\helpers\actions.py in _process_bulk_chunk_success(resp, bulk_data, ignore_status, raise_on_error)
185
186 if errors:
--> 187 raise BulkIndexError("%i document(s) failed to index." % len(errors), errors)
188
189
BulkIndexError: ('500 document(s) failed to index.', [{'index': {'_index': 'wikidata_index', '_type': '_doc', '_id': 'dbxzon4BOVq7OZfct2-t', 'status': 400, 'error': {'type': 'mapper_parsing_exception', 'reason': 'failed to parse', 'caused_by': {'type': 'illegal_argument_exception', 'reason': 'field name cannot be an empty string'}},
从堆栈跟踪的最后一行可以看出问题:
BulkIndexError: ('500 document(s) failed to index.', [{'index': {'_index': 'wikidata_index', '_type': '_doc', '_id': 'dbxzon4BOVq7OZfct2-t', 'status': 400, 'error': {'type': 'mapper_parsing_exception', 'reason': 'failed to parse', 'caused_by': {'type': 'illegal_argument_exception', 'reason': 'field name cannot be an empty string'}},
您需要用空键删除或替换字段,例如 this one。
The solution that worked for :
with open(full_path,encoding="utf8") as f:
reader = csv.DictReader(f)
for row in reader:
if row['type'] not in ['Q4167410', 'Q4167836']:
if row['araliases'] !="<class 'str'>":
araliases = row['araliases']
else:
araliases= None
if row['enaliases'] !="<class 'str'>":
enaliases = row['enaliases']
else:
enaliases= None
if row['maincategory'] !="<class 'str'>":
maincategory = row['maincategory']
else:
maincategory= None
wikidata_item ={
"id":row['id'],
"type":row['type'],
"arlabel":row['arlabel'],
"enlabel":row['enlabel'],
"araliases": araliases,
"enaliases":enaliases,
"ardescription":row['ardescription'],
"endescription":row['endescription'],
"maincategory": maincategory,
"arwiki":row['arwiki'],
"enwiki":row['enwiki'],
"arwikiquote":row['arwikiquote']
}
actions.append(wikidata_item)
#print(actions)
helpers.bulk(es,actions, index='wikidata_index')