在 Python DSL 中使用批量时 Elasticsearch 忽略映射
Elasticsearch ignores mapping when using bulk in Python DSL
我正在尝试将 CSV 文件上传到 elasticsearch 索引。假设文件是这样的(没有 headers,只有数据):
bird,10,dog
cat,20,giraffe
这是我的代码:
from elasticsearch_dsl import DocType, Integer, Keyword
from elasticsearch_dsl.connections import connections
from elasticsearch.helpers import bulk
import csv
connections.create_connection(hosts=["localhost"])
class Mapping(DocType):
animal1 = Keyword()
number = Integer()
animal2 = Keyword()
class Meta:
index = "index-name"
doc_type = "doc-type"
Mapping.init()
with open("/path/to/file", "r", encoding="latin-1") as f:
reader = csv.DictReader(f)
bulk(
connections.get_connection(),
(Mapping(**row).to_dict(True) for row in reader)
)
问题是 elasticsearch 似乎忽略了映射并将文件的第一行用作 headers(并基于此创建映射)。
编辑:它确实使用了我的映射和文件的第一行。它生成的映射是:
{
"index-name": {
"mappings": {
"doc-type": {
"properties": {
"10": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
},
"dog": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
},
"animal1": {
"type": "keyword"
},
"animal2": {
"type": "keyword"
},
"bird": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
},
"number": {
"type": "integer"
}
}
}
}
}
}
如果我只创建索引而不上传数据,映射似乎没问题:
{
"index-name": {
"mappings": {
"doc-type": {
"properties": {
"animal1": {
"type": "keyword"
},
"animal2": {
"type": "keyword"
},
"number": {
"type": "integer"
}
}
}
}
}
}
如何让 ES 使用给定的映射?
Elasticsearch 使用每行的第一个元素创建一个新字段,因为您没有为带有 headers 的 csv 数据指定一个字段,您没有指定元组的第一个元素是 animal1 , 第二个是 animal2 等等..试试这个:
from elasticsearch import Elasticsearch
from elasticsearch import helpers
index_name = "your_index_name"
doc_type = "your_doc_type"
esConnector = Elasticsearch(["http://192.168.1.1:9200/"])
# change your ip here
paramL = []
contatore = 1
with open(args.file, "r", encoding="latin-1") as f:
for line in f:
line = line.split(",")
tripla = {"animal1": line[0], "number": line[1], "animal2": line[2]}
ogg={
'_op_type': 'index',
'_index': index_name,
'_type': doc_type,
'_id': contatore,
'_source': tripla
}
contatore +=1
paramL.append(ogg)
for success, info in helpers.parallel_bulk(client=esConnector, actions=paramL, thread_count=4):
if not success:
print 'Doc failed', info
我正在尝试将 CSV 文件上传到 elasticsearch 索引。假设文件是这样的(没有 headers,只有数据):
bird,10,dog
cat,20,giraffe
这是我的代码:
from elasticsearch_dsl import DocType, Integer, Keyword
from elasticsearch_dsl.connections import connections
from elasticsearch.helpers import bulk
import csv
connections.create_connection(hosts=["localhost"])
class Mapping(DocType):
animal1 = Keyword()
number = Integer()
animal2 = Keyword()
class Meta:
index = "index-name"
doc_type = "doc-type"
Mapping.init()
with open("/path/to/file", "r", encoding="latin-1") as f:
reader = csv.DictReader(f)
bulk(
connections.get_connection(),
(Mapping(**row).to_dict(True) for row in reader)
)
问题是 elasticsearch 似乎忽略了映射并将文件的第一行用作 headers(并基于此创建映射)。
编辑:它确实使用了我的映射和文件的第一行。它生成的映射是:
{
"index-name": {
"mappings": {
"doc-type": {
"properties": {
"10": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
},
"dog": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
},
"animal1": {
"type": "keyword"
},
"animal2": {
"type": "keyword"
},
"bird": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
},
"number": {
"type": "integer"
}
}
}
}
}
}
如果我只创建索引而不上传数据,映射似乎没问题:
{
"index-name": {
"mappings": {
"doc-type": {
"properties": {
"animal1": {
"type": "keyword"
},
"animal2": {
"type": "keyword"
},
"number": {
"type": "integer"
}
}
}
}
}
}
如何让 ES 使用给定的映射?
Elasticsearch 使用每行的第一个元素创建一个新字段,因为您没有为带有 headers 的 csv 数据指定一个字段,您没有指定元组的第一个元素是 animal1 , 第二个是 animal2 等等..试试这个:
from elasticsearch import Elasticsearch
from elasticsearch import helpers
index_name = "your_index_name"
doc_type = "your_doc_type"
esConnector = Elasticsearch(["http://192.168.1.1:9200/"])
# change your ip here
paramL = []
contatore = 1
with open(args.file, "r", encoding="latin-1") as f:
for line in f:
line = line.split(",")
tripla = {"animal1": line[0], "number": line[1], "animal2": line[2]}
ogg={
'_op_type': 'index',
'_index': index_name,
'_type': doc_type,
'_id': contatore,
'_source': tripla
}
contatore +=1
paramL.append(ogg)
for success, info in helpers.parallel_bulk(client=esConnector, actions=paramL, thread_count=4):
if not success:
print 'Doc failed', info