不安全模式下的 Elasticsearch pyspark 连接

Elasticsearch pyspark connection in insecure mode

我的最终目标是将数据从 hdfs 插入到 elasticsearch,但我面临的问题是连接性

我可以使用以下 curl 命令连接到我的 elasticsearch 节点

curl -u username -X GET https://xx.xxx.xx.xxx:9200/_cat/indices?v' --insecure

但是说到与spark的联系,我做不到。我插入数据的命令是 df.write.mode("append").format('org.elasticsearch.spark.sql').option("es.net.http.auth.user", "username").option("es.net.http.auth.pass", "password").option("es.index.auto.create","true").option('es.nodes', 'https://xx.xxx.xx.xxx').option('es.port','9200').save('my-index/my-doctype')

我得到的错误是

org.elastisearch.hadoop.EsHadoopIllegalArgumentException:Cannot detect ES version - typical this happens if then network/Elasticsearch cluster is not accessible or when targetting a Wan/Cloud instance without the proper setting 'es.nodes.wan.only'
....
....
Caused by: org.elasticseach.hadoop.rest.EsHadoopNoNodesLeftException: Connection error (check network and/or proy settings)- all nodes failed; tried [[xx.xxx.xx.xxx:9200]]
....
...

在这里,什么是 curl --insecure

的 pyspark 等价物

谢谢

你能试试下面的 sparkConfs,

val sparkConf = new SparkConf()
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.set("spark.es.index.auto.create", "true")
.set("spark.es.nodes", "yourESaddress")
.set("spark.es.port", "9200")
.set("spark.es.net.http.auth.user","")
.set("spark.es.net.http.auth.pass", "")
.set("spark.es.resource", indexName)
.set("spark.es.nodes.wan.only", "true")

那你还是遇到问题了,es.net.ssl = true再看看。

如果仍然出现错误,请尝试添加以下配置,

'es.resource' = 'ctrl_rater_resumen_lla/hb',
'es.nodes' = 'localhost',
'es.port' = '9200',
'es.index.auto.create' = 'true',
'es.index.read.missing.as.empty' = 'true',
'es.nodes.discovery'='true',
'es.net.ssl'='false'
'es.nodes.client.only'='false',
'es.nodes.wan.only' = 'true'
'es.net.http.auth.user'='xxxxx',
'es.net.http.auth.pass' = 'xxxxx'
'es.nodes.discovery' = 'false'

经过多次尝试和不同的配置选项。我找到了一种如何在 https 上不安全地连接 elastisearch 运行

的方法
        dfToEs.write.mode("append").format('org.elasticsearch.spark.sql') \
        .option("es.net.http.auth.user", username) \
        .option("es.net.http.auth.pass", password) \
        .option("es.net.ssl", "true") \
        .option("es.net.ssl.cert.allow.self.signed", "true") \
        .option("mergeSchema", "true") \
        .option('es.index.auto.create', 'true') \
        .option('es.nodes', 'https://{}'.format(es_ip)) \
        .option('es.port', '9200') \
        .option('es.batch.write.retry.wait', '100s') \
        .save('{index}/_doc'.format(index=index))

(es.net.ssl, true)

我们还必须提供如下自签名证书

(es.net.ssl.cert.allow.self.signed, true)

我确实检查了很多东西,最后我可以在 AWS ElasticSearch 服务 (ES) 中编写,但是 scala/spark。

  1. 在VPC中,创建安全组从EMR访问ES,端口443(ES入站规则到EMR的SG,EMR入站规则到同一个端口)
  2. 使用 telnet 命令从 EMR 主节点检查连接
    telnet xyz.eu-west-1.es.amazonaws.com 443
  1. 完成以上检查后,使用 curl 命令检查应用级别

    curl https://xyz.eu-west-1.es.amazonaws.com:443/domainname/_search?pretty=true&?q=*```
    
  2. 之后,转到代码,在我的例子中,我确实使用 spark-shell 进行了测试,但是服务器配置包含在启动中,如下所示:

     spark-shell --jars elasticsearch-spark-20_2.11-7.1.1.jar --conf spark.es.nodes="xyz.eu-west-1.es.amazonaws.com" --conf spark.es.port=443 --conf spark.es.nodes.wan.only=true --conf spark.es.nodes.discovery="false" --conf spark.es.index.auto.create="true" --conf spark.es.resource="domain/doc" --conf spark.es.scheme="https"
    
    1. 终于要写代码了:
    import java.util.Date
    import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}
    import org.elasticsearch.spark._
    import org.elasticsearch.spark.sql._
    val dateformat =  new java.text.SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss")
    val currentdate = dateformat.format(new Date)
    val colorsDF = spark.read.json("multilinecolors.json")
    val mcolors = colorsDF.withColumn("Date",lit(currentdate))
    mcolors.write.mode("append").format("org.elasticsearch.spark.sql").option("es.net.http.auth.user", "").option("es.net.http.auth.pass", "").option("es.net.ssl", "true").option("es.net.ssl.cert.allow.self.signed", "true").option("mergeSchema", "true").option("es.index.auto.create", "true").option("es.nodes","https://xyz.eu-west-1.es.amazonaws.com").option("es.port", "443").option("es.batch.write.retry.wait", "100").save("domainname/_doc")```