不安全模式下的 Elasticsearch pyspark 连接
Elasticsearch pyspark connection in insecure mode
我的最终目标是将数据从 hdfs 插入到 elasticsearch,但我面临的问题是连接性
我可以使用以下 curl 命令连接到我的 elasticsearch 节点
curl -u username -X GET https://xx.xxx.xx.xxx:9200/_cat/indices?v' --insecure
但是说到与spark的联系,我做不到。我插入数据的命令是
df.write.mode("append").format('org.elasticsearch.spark.sql').option("es.net.http.auth.user", "username").option("es.net.http.auth.pass", "password").option("es.index.auto.create","true").option('es.nodes', 'https://xx.xxx.xx.xxx').option('es.port','9200').save('my-index/my-doctype')
我得到的错误是
org.elastisearch.hadoop.EsHadoopIllegalArgumentException:Cannot detect ES version - typical this happens if then network/Elasticsearch cluster is not accessible or when targetting a Wan/Cloud instance without the proper setting 'es.nodes.wan.only'
....
....
Caused by: org.elasticseach.hadoop.rest.EsHadoopNoNodesLeftException: Connection error (check network and/or proy settings)- all nodes failed; tried [[xx.xxx.xx.xxx:9200]]
....
...
在这里,什么是 curl --insecure
的 pyspark 等价物
谢谢
你能试试下面的 sparkConfs,
val sparkConf = new SparkConf()
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.set("spark.es.index.auto.create", "true")
.set("spark.es.nodes", "yourESaddress")
.set("spark.es.port", "9200")
.set("spark.es.net.http.auth.user","")
.set("spark.es.net.http.auth.pass", "")
.set("spark.es.resource", indexName)
.set("spark.es.nodes.wan.only", "true")
那你还是遇到问题了,es.net.ssl = true
再看看。
如果仍然出现错误,请尝试添加以下配置,
'es.resource' = 'ctrl_rater_resumen_lla/hb',
'es.nodes' = 'localhost',
'es.port' = '9200',
'es.index.auto.create' = 'true',
'es.index.read.missing.as.empty' = 'true',
'es.nodes.discovery'='true',
'es.net.ssl'='false'
'es.nodes.client.only'='false',
'es.nodes.wan.only' = 'true'
'es.net.http.auth.user'='xxxxx',
'es.net.http.auth.pass' = 'xxxxx'
'es.nodes.discovery' = 'false'
经过多次尝试和不同的配置选项。我找到了一种如何在 https 上不安全地连接 elastisearch 运行
的方法
dfToEs.write.mode("append").format('org.elasticsearch.spark.sql') \
.option("es.net.http.auth.user", username) \
.option("es.net.http.auth.pass", password) \
.option("es.net.ssl", "true") \
.option("es.net.ssl.cert.allow.self.signed", "true") \
.option("mergeSchema", "true") \
.option('es.index.auto.create', 'true') \
.option('es.nodes', 'https://{}'.format(es_ip)) \
.option('es.port', '9200') \
.option('es.batch.write.retry.wait', '100s') \
.save('{index}/_doc'.format(index=index))
与
(es.net.ssl, true)
我们还必须提供如下自签名证书
(es.net.ssl.cert.allow.self.signed, true)
我确实检查了很多东西,最后我可以在 AWS ElasticSearch 服务 (ES) 中编写,但是 scala/spark。
- 在VPC中,创建安全组从EMR访问ES,端口443(ES入站规则到EMR的SG,EMR入站规则到同一个端口)
- 使用 telnet 命令从 EMR 主节点检查连接
telnet xyz.eu-west-1.es.amazonaws.com 443
完成以上检查后,使用 curl 命令检查应用级别
curl https://xyz.eu-west-1.es.amazonaws.com:443/domainname/_search?pretty=true&?q=*```
之后,转到代码,在我的例子中,我确实使用 spark-shell 进行了测试,但是服务器配置包含在启动中,如下所示:
spark-shell --jars elasticsearch-spark-20_2.11-7.1.1.jar --conf spark.es.nodes="xyz.eu-west-1.es.amazonaws.com" --conf spark.es.port=443 --conf spark.es.nodes.wan.only=true --conf spark.es.nodes.discovery="false" --conf spark.es.index.auto.create="true" --conf spark.es.resource="domain/doc" --conf spark.es.scheme="https"
- 终于要写代码了:
import java.util.Date
import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}
import org.elasticsearch.spark._
import org.elasticsearch.spark.sql._
val dateformat = new java.text.SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss")
val currentdate = dateformat.format(new Date)
val colorsDF = spark.read.json("multilinecolors.json")
val mcolors = colorsDF.withColumn("Date",lit(currentdate))
mcolors.write.mode("append").format("org.elasticsearch.spark.sql").option("es.net.http.auth.user", "").option("es.net.http.auth.pass", "").option("es.net.ssl", "true").option("es.net.ssl.cert.allow.self.signed", "true").option("mergeSchema", "true").option("es.index.auto.create", "true").option("es.nodes","https://xyz.eu-west-1.es.amazonaws.com").option("es.port", "443").option("es.batch.write.retry.wait", "100").save("domainname/_doc")```
我的最终目标是将数据从 hdfs 插入到 elasticsearch,但我面临的问题是连接性
我可以使用以下 curl 命令连接到我的 elasticsearch 节点
curl -u username -X GET https://xx.xxx.xx.xxx:9200/_cat/indices?v' --insecure
但是说到与spark的联系,我做不到。我插入数据的命令是
df.write.mode("append").format('org.elasticsearch.spark.sql').option("es.net.http.auth.user", "username").option("es.net.http.auth.pass", "password").option("es.index.auto.create","true").option('es.nodes', 'https://xx.xxx.xx.xxx').option('es.port','9200').save('my-index/my-doctype')
我得到的错误是
org.elastisearch.hadoop.EsHadoopIllegalArgumentException:Cannot detect ES version - typical this happens if then network/Elasticsearch cluster is not accessible or when targetting a Wan/Cloud instance without the proper setting 'es.nodes.wan.only'
....
....
Caused by: org.elasticseach.hadoop.rest.EsHadoopNoNodesLeftException: Connection error (check network and/or proy settings)- all nodes failed; tried [[xx.xxx.xx.xxx:9200]]
....
...
在这里,什么是 curl --insecure
的 pyspark 等价物谢谢
你能试试下面的 sparkConfs,
val sparkConf = new SparkConf()
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.set("spark.es.index.auto.create", "true")
.set("spark.es.nodes", "yourESaddress")
.set("spark.es.port", "9200")
.set("spark.es.net.http.auth.user","")
.set("spark.es.net.http.auth.pass", "")
.set("spark.es.resource", indexName)
.set("spark.es.nodes.wan.only", "true")
那你还是遇到问题了,es.net.ssl = true
再看看。
如果仍然出现错误,请尝试添加以下配置,
'es.resource' = 'ctrl_rater_resumen_lla/hb',
'es.nodes' = 'localhost',
'es.port' = '9200',
'es.index.auto.create' = 'true',
'es.index.read.missing.as.empty' = 'true',
'es.nodes.discovery'='true',
'es.net.ssl'='false'
'es.nodes.client.only'='false',
'es.nodes.wan.only' = 'true'
'es.net.http.auth.user'='xxxxx',
'es.net.http.auth.pass' = 'xxxxx'
'es.nodes.discovery' = 'false'
经过多次尝试和不同的配置选项。我找到了一种如何在 https 上不安全地连接 elastisearch 运行
的方法 dfToEs.write.mode("append").format('org.elasticsearch.spark.sql') \
.option("es.net.http.auth.user", username) \
.option("es.net.http.auth.pass", password) \
.option("es.net.ssl", "true") \
.option("es.net.ssl.cert.allow.self.signed", "true") \
.option("mergeSchema", "true") \
.option('es.index.auto.create', 'true') \
.option('es.nodes', 'https://{}'.format(es_ip)) \
.option('es.port', '9200') \
.option('es.batch.write.retry.wait', '100s') \
.save('{index}/_doc'.format(index=index))
与
(es.net.ssl, true)
我们还必须提供如下自签名证书
(es.net.ssl.cert.allow.self.signed, true)
我确实检查了很多东西,最后我可以在 AWS ElasticSearch 服务 (ES) 中编写,但是 scala/spark。
- 在VPC中,创建安全组从EMR访问ES,端口443(ES入站规则到EMR的SG,EMR入站规则到同一个端口)
- 使用 telnet 命令从 EMR 主节点检查连接
telnet xyz.eu-west-1.es.amazonaws.com 443
完成以上检查后,使用 curl 命令检查应用级别
curl https://xyz.eu-west-1.es.amazonaws.com:443/domainname/_search?pretty=true&?q=*```
之后,转到代码,在我的例子中,我确实使用 spark-shell 进行了测试,但是服务器配置包含在启动中,如下所示:
spark-shell --jars elasticsearch-spark-20_2.11-7.1.1.jar --conf spark.es.nodes="xyz.eu-west-1.es.amazonaws.com" --conf spark.es.port=443 --conf spark.es.nodes.wan.only=true --conf spark.es.nodes.discovery="false" --conf spark.es.index.auto.create="true" --conf spark.es.resource="domain/doc" --conf spark.es.scheme="https"
- 终于要写代码了:
import java.util.Date import org.apache.spark.sql.{DataFrame, Dataset, SparkSession} import org.elasticsearch.spark._ import org.elasticsearch.spark.sql._ val dateformat = new java.text.SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss") val currentdate = dateformat.format(new Date) val colorsDF = spark.read.json("multilinecolors.json") val mcolors = colorsDF.withColumn("Date",lit(currentdate)) mcolors.write.mode("append").format("org.elasticsearch.spark.sql").option("es.net.http.auth.user", "").option("es.net.http.auth.pass", "").option("es.net.ssl", "true").option("es.net.ssl.cert.allow.self.signed", "true").option("mergeSchema", "true").option("es.index.auto.create", "true").option("es.nodes","https://xyz.eu-west-1.es.amazonaws.com").option("es.port", "443").option("es.batch.write.retry.wait", "100").save("domainname/_doc")```