Logstash 没有将数据迁移到 Elasticsearch

Logstash is not migrating data to Elasticsearch

我有一堆 CSV 文件需要迁移到 Elastichsearch,我设法使用了 Logstash 版本 7.16.3,索引已经使用正确的映射在 elastichsearch 上创建。配置文件如下:

input{
    file{
        path=> "C:/Users/fr-pa/Documents/wikidata/extracted/*.csv"
        start_position => "beginning"
        sincedb_path => "NULL"
    } } filter{
    csv{
        separator => ","
        columns =>["id", "type", "arlabel", "enlabel","araliases",
                                      "enaliases","ardescription","endescription","maincategory",
                                      "arwiki", "enwiki","arwikiquote", "enwikiquote"]
    } } output{
    elasticsearch{
        hosts=> "http://localhost:9200/"
        index => "wikidata_index"
        
    }
    stdout {
       
    } }

但是数据没有迁移,Logstash的输出:

有人知道问题出在哪里吗?

这是我的索引

索引设置

request_body = {
    "settings": {
    "analysis": {
      "filter": {
        "arabic_stop": {
          "type":       "stop",
          "stopwords":  "_arabic_"
        },
        "arabic_keywords": {
          "type":       "keyword_marker",
          "keywords":   ["مثال"]
        },
        "arabic_stemmer": {
          "type":       "stemmer",
          "language":   "arabic"
        },
        "english_stop": {
          "type":       "stop",
          "stopwords":  "_english_"
        },
        "english_keywords": {
          "type":       "keyword_marker",
          "keywords":   ["example"]
        },
        "english_stemmer": {
          "type":       "stemmer",
          "language":   "english"
        },
        "english_possessive_stemmer": {
          "type":       "stemmer",
          "language":   "possessive_english"
        }
      },
      "analyzer": {
        "rebuilt_arabic": {
          "tokenizer":  "standard",
          "filter": [
            "lowercase",
            "decimal_digit",
            "arabic_stop",
            "arabic_normalization",
            "arabic_keywords",
            "arabic_stemmer"
          ]
        },
       "comma_split":{
          "type" : "pattern",
          "pattern" : ","
        },
        "rebuilt_english": {
          "tokenizer":  "standard",
          "filter": [
            "english_possessive_stemmer",
            "lowercase",
            "english_stop",
            "english_keywords",
            "english_stemmer"
          ]
        }
      }
    }
  } ,
   "mappings": {
        "properties": {
          "id": {
          "type": "keyword",
          "ignore_above": 256
        },
          "type": {
          "type": "text",
          "analyzer": "comma_split"
        },
        "arlabel": {
          "type": "text",
          "analyzer": "rebuilt_arabic"
        },
        "enlabel": {
          "type": "text",
          "analyzer": "rebuilt_english"
        },
        "araliases": {
          "type": "text",
          "analyzer": "comma_split"
        },
        "enaliases": {
          "type": "text",
          "analyzer": "comma_split"
        },
        "ardescription":{
          "type": "text",
          "analyzer": "rebuilt_arabic"
        },
          "endescription":{
          "type": "text",
          "analyzer": "rebuilt_english"
        },
          "maincategory":{
          "type": "text",
          "analyzer": "comma_split"
        },
          "arwiki":{
          "type": "text",
          "analyzer": "rebuilt_arabic"
        },
          "enwiki":{
          "type": "text",
          "analyzer": "rebuilt_english"
        },
          "arwikiquote":{
          "type": "text",
          "analyzer": "rebuilt_arabic"
        },
        "enwikiquote": {
          "type": "text",
          "analyzer": "rebuilt_english"
        }
      }
    }
}

请注意,有些字段的值为空,我尝试使用 python 批量助手 class 来插入数据:

with open(full_path,encoding="utf8") as f:
    reader = csv.DictReader(f)
    print(reader)
    helpers.bulk(es, reader, index='wikidata_index') 

引发的错误是:

C:\Users\fr-pa\Documents\wikidata\extracted\till_Q10091689_item.csv
<csv.DictReader object at 0x0000028E86C47EB0>
---------------------------------------------------------------------------
BulkIndexError                            Traceback (most recent call last)
<ipython-input-42-3849641bd8f9> in <module>
      5         reader = csv.DictReader(f)
      6         print(reader)
----> 7         helpers.bulk(es, reader, index='wikidata_index')

C:\ProgramData\Anaconda3\lib\site-packages\elasticsearch\helpers\actions.py in bulk(client, actions, stats_only, ignore_status, *args, **kwargs)
    408     # make streaming_bulk yield successful results so we can count them
    409     kwargs["yield_ok"] = True
--> 410     for ok, item in streaming_bulk(
    411         client, actions, ignore_status=ignore_status, *args, **kwargs
    412     ):

C:\ProgramData\Anaconda3\lib\site-packages\elasticsearch\helpers\actions.py in streaming_bulk(client, actions, chunk_size, max_chunk_bytes, raise_on_error, expand_action_callback, raise_on_exception, max_retries, initial_backoff, max_backoff, yield_ok, ignore_status, *args, **kwargs)
    327 
    328             try:
--> 329                 for data, (ok, info) in zip(
    330                     bulk_data,
    331                     _process_bulk_chunk(

C:\ProgramData\Anaconda3\lib\site-packages\elasticsearch\helpers\actions.py in _process_bulk_chunk(client, bulk_actions, bulk_data, raise_on_exception, raise_on_error, ignore_status, *args, **kwargs)
    254             raise_on_error=raise_on_error,
    255         )
--> 256     for item in gen:
    257         yield item
    258 

C:\ProgramData\Anaconda3\lib\site-packages\elasticsearch\helpers\actions.py in _process_bulk_chunk_success(resp, bulk_data, ignore_status, raise_on_error)
    185 
    186     if errors:
--> 187         raise BulkIndexError("%i document(s) failed to index." % len(errors), errors)
    188 
    189 

BulkIndexError: ('500 document(s) failed to index.', [{'index': {'_index': 'wikidata_index', '_type': '_doc', '_id': 'dbxzon4BOVq7OZfct2-t', 'status': 400, 'error': {'type': 'mapper_parsing_exception', 'reason': 'failed to parse', 'caused_by': {'type': 'illegal_argument_exception', 'reason': 'field name cannot be an empty string'}},

从堆栈跟踪的最后一行可以看出问题:

BulkIndexError: ('500 document(s) failed to index.', [{'index': {'_index': 'wikidata_index', '_type': '_doc', '_id': 'dbxzon4BOVq7OZfct2-t', 'status': 400, 'error': {'type': 'mapper_parsing_exception', 'reason': 'failed to parse', 'caused_by': {'type': 'illegal_argument_exception', 'reason': 'field name cannot be an empty string'}},

您需要用空键删除或替换字段,例如 this one

The solution that worked for :

 with open(full_path,encoding="utf8") as f:
        reader = csv.DictReader(f)
        for row in reader:
            if row['type']  not in ['Q4167410', 'Q4167836']:
                if row['araliases'] !="<class 'str'>":
                    araliases = row['araliases']
                else:
                    araliases= None
                if row['enaliases'] !="<class 'str'>":
                    enaliases = row['enaliases']
                else:
                    enaliases= None
                if row['maincategory'] !="<class 'str'>":
                    maincategory = row['maincategory']
                else:
                    maincategory= None                
                wikidata_item ={
                    "id":row['id'],
                    "type":row['type'],
                    "arlabel":row['arlabel'],
                    "enlabel":row['enlabel'],
                    "araliases": araliases,
                    "enaliases":enaliases,
                    "ardescription":row['ardescription'],
                    "endescription":row['endescription'],
                    "maincategory": maincategory,
                    "arwiki":row['arwiki'],
                    "enwiki":row['enwiki'],
                    "arwikiquote":row['arwikiquote']             

                }
                actions.append(wikidata_item)
    #print(actions)
    helpers.bulk(es,actions, index='wikidata_index')