在 Python 中将 CSV 索引到 ElasticSearch
Index CSV to ElasticSearch in Python
希望在不使用 Logstash 的情况下将 CSV 文件编入 ElasticSearch 的索引。
我正在使用 elasticsearch-dsl
高级库。
给定一个 header 的 CSV,例如:
name,address,url
adam,hills 32,http://rockit.com
jane,valleys 23,http://popit.com
按字段索引所有数据的最佳方法是什么?最终我想让每一行看起来像这样
{
"name": "adam",
"address": "hills 32",
"url": "http://rockit.com"
}
使用较低级别的 elasticsearch-py
库可以更轻松地完成此类任务:
from elasticsearch import helpers, Elasticsearch
import csv
es = Elasticsearch()
with open('/tmp/x.csv') as f:
reader = csv.DictReader(f)
helpers.bulk(es, reader, index='my-index', doc_type='my-type')
如果您想从 .tsv/.csv
创建具有严格类型和模型的 elasticsearch
数据库以实现更好的过滤,您可以这样做:
class ElementIndex(DocType):
ROWNAME = Text()
ROWNAME = Text()
class Meta:
index = 'index_name'
def indexing(self):
obj = ElementIndex(
ROWNAME=str(self['NAME']),
ROWNAME=str(self['NAME'])
)
obj.save(index="index_name")
return obj.to_dict(include_meta=True)
def bulk_indexing(args):
# ElementIndex.init(index="index_name")
ElementIndex.init()
es = Elasticsearch()
//here your result dict with data from source
r = bulk(client=es, actions=(indexing(c) for c in result))
es.indices.refresh()
希望在不使用 Logstash 的情况下将 CSV 文件编入 ElasticSearch 的索引。
我正在使用 elasticsearch-dsl
高级库。
给定一个 header 的 CSV,例如:
name,address,url
adam,hills 32,http://rockit.com
jane,valleys 23,http://popit.com
按字段索引所有数据的最佳方法是什么?最终我想让每一行看起来像这样
{
"name": "adam",
"address": "hills 32",
"url": "http://rockit.com"
}
使用较低级别的 elasticsearch-py
库可以更轻松地完成此类任务:
from elasticsearch import helpers, Elasticsearch
import csv
es = Elasticsearch()
with open('/tmp/x.csv') as f:
reader = csv.DictReader(f)
helpers.bulk(es, reader, index='my-index', doc_type='my-type')
如果您想从 .tsv/.csv
创建具有严格类型和模型的 elasticsearch
数据库以实现更好的过滤,您可以这样做:
class ElementIndex(DocType):
ROWNAME = Text()
ROWNAME = Text()
class Meta:
index = 'index_name'
def indexing(self):
obj = ElementIndex(
ROWNAME=str(self['NAME']),
ROWNAME=str(self['NAME'])
)
obj.save(index="index_name")
return obj.to_dict(include_meta=True)
def bulk_indexing(args):
# ElementIndex.init(index="index_name")
ElementIndex.init()
es = Elasticsearch()
//here your result dict with data from source
r = bulk(client=es, actions=(indexing(c) for c in result))
es.indices.refresh()