通过 PySpark 在 Elasticsearch 中插入数组
Inserting arrays in Elasticsearch via PySpark
我有一个案例很像这个:
示例数据帧:
from pyspark.sql.types import *
schema = StructType([ # schema
StructField("id", StringType(), True),
StructField("email", ArrayType(StringType()), True)])
df = spark.createDataFrame([{"id": "id1"},
{"id": "id2", "email": None},
{"id": "id3","email": ["email1@gmail.com"]},
{"id": "id4", "email": ["email1@gmail.com", "email2@gmail.com"]}],
schema=schema)
df.show(truncate=False)
+---+------------------------------------+
|id |email |
+---+------------------------------------+
|id1|null |
|id2|null |
|id3|[email1@gmail.com] |
|id4|[email1@gmail.com, email2@gmail.com]|
+---+------------------------------------+
我想把这个数据插入到Elasticsearch中,据我研究,我必须转换成索引格式:
def parseTest(r):
if r['email'] is None:
return r['id'],{"id":r['id']}
else:
return r['id'],{"id":r['id'],"email":r['email']}
df2 = df.rdd.map(lambda row: parseTest(row))
df2.top(4)
[('id4', {'email': ['email1@gmail.com', 'email2@gmail.com'], 'id': 'id4'}),
('id3', {'email': ['email1@gmail.com'], 'id': 'id3'}),
('id2', {'id': 'id2'}),
('id1', {'id': 'id1'})]
然后我尝试插入:
es_conf = {"es.nodes" : "node1.com,node2.com",
"es.resource": "index/type"}
df2.saveAsNewAPIHadoopFile(
path='-',
outputFormatClass="org.elasticsearch.hadoop.mr.EsOutputFormat",
keyClass="org.apache.hadoop.io.NullWritable",
valueClass="org.elasticsearch.hadoop.mr.LinkedMapWritable",
conf=es_conf)
我明白了:
org.apache.spark.SparkException: Data of type java.util.ArrayList
cannot be used
Spark v 2.1.0
ES v 2.4.4
没有 email
字段它工作正常,我发现了一些使用 es.output.json: true
和 json.dumps
的建议解决方案,但它似乎适用于版本 5,所以我尝试了另一个我有 ES v5
集群
df3 = df2.map(json.dumps)
df3.top(4)
['["id4", {"email": ["email1@gmail.com", "email2@gmail.com"], "id": "id4"}]',
'["id3", {"email": ["email1@gmail.com"], "id": "id3"}]',
'["id2", {"id": "id2"}]',
'["id1", {"id": "id1"}]']
es_conf2 = {"es.nodes" : "anothernode1.com,anothernode2.com",
"es.output.json": "true",
"es.resource": "index/type"}
df3.saveAsNewAPIHadoopFile(
path='-',
outputFormatClass="org.elasticsearch.hadoop.mr.EsOutputFormat",
keyClass="org.apache.hadoop.io.NullWritable",
valueClass="org.elasticsearch.hadoop.mr.LinkedMapWritable",
conf=es_conf2)
然后我得到:
RDD element of type java.lang.String cannot be used
Spark v 2.1.0
ES v 5.2.0
感觉很坏
我找到了另一种方法来完成同样的工作,即使用数据框对象的 write
方法。
所以,在第一部分之后:
from pyspark.sql.types import *
schema = StructType([ # schema
StructField("id", StringType(), True),
StructField("email", ArrayType(StringType()), True)])
df = spark.createDataFrame([{"id": "id1"},
{"id": "id2", "email": None},
{"id": "id3","email": ["email1@gmail.com"]},
{"id": "id4", "email": ["email1@gmail.com", "email2@gmail.com"]}],
schema=schema)
df.show(truncate=False)
+---+------------------------------------+
|id |email |
+---+------------------------------------+
|id1|null |
|id2|null |
|id3|[email1@gmail.com] |
|id4|[email1@gmail.com, email2@gmail.com]|
+---+------------------------------------+
您只需要:
df.write\
.format("org.elasticsearch.spark.sql")\
.option("es.nodes","node1.com,node2.com")\
.option("es.resource","index/type")\
.option("es.mapping.id", "id")\
.save()
无需转成RDD,也无需任何修改。
我有一个案例很像这个:
示例数据帧:
from pyspark.sql.types import *
schema = StructType([ # schema
StructField("id", StringType(), True),
StructField("email", ArrayType(StringType()), True)])
df = spark.createDataFrame([{"id": "id1"},
{"id": "id2", "email": None},
{"id": "id3","email": ["email1@gmail.com"]},
{"id": "id4", "email": ["email1@gmail.com", "email2@gmail.com"]}],
schema=schema)
df.show(truncate=False)
+---+------------------------------------+
|id |email |
+---+------------------------------------+
|id1|null |
|id2|null |
|id3|[email1@gmail.com] |
|id4|[email1@gmail.com, email2@gmail.com]|
+---+------------------------------------+
我想把这个数据插入到Elasticsearch中,据我研究,我必须转换成索引格式:
def parseTest(r):
if r['email'] is None:
return r['id'],{"id":r['id']}
else:
return r['id'],{"id":r['id'],"email":r['email']}
df2 = df.rdd.map(lambda row: parseTest(row))
df2.top(4)
[('id4', {'email': ['email1@gmail.com', 'email2@gmail.com'], 'id': 'id4'}),
('id3', {'email': ['email1@gmail.com'], 'id': 'id3'}),
('id2', {'id': 'id2'}),
('id1', {'id': 'id1'})]
然后我尝试插入:
es_conf = {"es.nodes" : "node1.com,node2.com",
"es.resource": "index/type"}
df2.saveAsNewAPIHadoopFile(
path='-',
outputFormatClass="org.elasticsearch.hadoop.mr.EsOutputFormat",
keyClass="org.apache.hadoop.io.NullWritable",
valueClass="org.elasticsearch.hadoop.mr.LinkedMapWritable",
conf=es_conf)
我明白了:
org.apache.spark.SparkException: Data of type java.util.ArrayList cannot be used
Spark v 2.1.0
ES v 2.4.4
没有 email
字段它工作正常,我发现了一些使用 es.output.json: true
和 json.dumps
的建议解决方案,但它似乎适用于版本 5,所以我尝试了另一个我有 ES v5
df3 = df2.map(json.dumps)
df3.top(4)
['["id4", {"email": ["email1@gmail.com", "email2@gmail.com"], "id": "id4"}]',
'["id3", {"email": ["email1@gmail.com"], "id": "id3"}]',
'["id2", {"id": "id2"}]',
'["id1", {"id": "id1"}]']
es_conf2 = {"es.nodes" : "anothernode1.com,anothernode2.com",
"es.output.json": "true",
"es.resource": "index/type"}
df3.saveAsNewAPIHadoopFile(
path='-',
outputFormatClass="org.elasticsearch.hadoop.mr.EsOutputFormat",
keyClass="org.apache.hadoop.io.NullWritable",
valueClass="org.elasticsearch.hadoop.mr.LinkedMapWritable",
conf=es_conf2)
然后我得到:
RDD element of type java.lang.String cannot be used
Spark v 2.1.0
ES v 5.2.0
感觉很坏
我找到了另一种方法来完成同样的工作,即使用数据框对象的 write
方法。
所以,在第一部分之后:
from pyspark.sql.types import *
schema = StructType([ # schema
StructField("id", StringType(), True),
StructField("email", ArrayType(StringType()), True)])
df = spark.createDataFrame([{"id": "id1"},
{"id": "id2", "email": None},
{"id": "id3","email": ["email1@gmail.com"]},
{"id": "id4", "email": ["email1@gmail.com", "email2@gmail.com"]}],
schema=schema)
df.show(truncate=False)
+---+------------------------------------+
|id |email |
+---+------------------------------------+
|id1|null |
|id2|null |
|id3|[email1@gmail.com] |
|id4|[email1@gmail.com, email2@gmail.com]|
+---+------------------------------------+
您只需要:
df.write\
.format("org.elasticsearch.spark.sql")\
.option("es.nodes","node1.com,node2.com")\
.option("es.resource","index/type")\
.option("es.mapping.id", "id")\
.save()
无需转成RDD,也无需任何修改。