是否可以使用 elasticsearch-hadoop/spark 写入具有格式化日期的动态创建的 Elasticsearch 索引?

Is it possible to write to a dynamically created Elasticsearch index with a formatted date using elasticsearch-hadoop/spark?

在独立的 spark 中,我正在尝试从数据框写入 Elasticsearch。虽然我可以让它工作,但我不知道如何写入格式为 'index_name-{ts_col:{YYYY-mm-dd}}' 的动态命名索引,其中 'ts_col' 是数据集中的日期时间字段。

我看过各种各样的帖子说这种语法应该有效,但是当我尝试它时,我发现错误包含在底部。在创建索引之前,它似乎首先要检查索引是否存在,但它会将未格式化的索引名称传递给它,而不是动态创建的索引名称。我尝试先使用 python elasticsearch 模块使用相同的语法创建索引,但它无法处理动态索引名称。

是否有任何可用的解决方案,或者我是否必须在 spark 中循环遍历我的数据集以找到表示的每个日期,创建我需要的索引,然后写入每个索引,一个一个时间?我错过了一些明显的东西吗? Logstash 很容易做到这一点,我不明白为什么我不能让它在 Spark 中工作。

这是我正在使用的写入命令(也尝试了它的不同变体):

df.write.format("org.elasticsearch.spark.sql")
  .option('es.index.auto.create', 'true')
  .option('es.resource', 'index_name-{ts_col:{YYYY.mm.dd}}/type_name')
  .option('es.mapping.id', 'es_id')
  .save()

这是我正在使用的罐子:

elasticsearch-hadoop-5.0.0/dist/elasticsearch-spark-20_2.11-5.0.0.jar

这是我在使用上面的写命令时得到的错误:

ERROR NetworkClient: Node [##.##.##.##:9200] failed (Invalid target URI HEAD@null/index_name-{ts_col:{YYYY.mm.dd}}/type_name); selected next node [##.##.##.##:9200]

...

...

Py4JJavaError: An error occurred while calling o114.save. : org.elasticsearch.hadoop.rest.EsHadoopNoNodesLeftException: Connection error (check network and/or proxy settings)- all nodes failed;

如果我将覆盖设置为 True,我会得到:

Py4JJavaError: An error occurred while calling o58.save. : org.elasticsearch.hadoop.rest.EsHadoopInvalidRequest: no such index null at org.elasticsearch.hadoop.rest.RestClient.checkResponse(RestClient.java:488) at org.elasticsearch.hadoop.rest.RestClient.execute(RestClient.java:446) at org.elasticsearch.hadoop.rest.RestClient.execute(RestClient.java:436) at org.elasticsearch.hadoop.rest.RestRepository.scroll(RestRepository.java:363) at org.elasticsearch.hadoop.rest.ScrollQuery.hasNext(ScrollQuery.java:92) at org.elasticsearch.hadoop.rest.RestRepository.delete(RestRepository.java:455) at org.elasticsearch.spark.sql.ElasticsearchRelation.insert(DefaultSource.scala:500) at org.elasticsearch.spark.sql.DefaultSource.createRelation(DefaultSource.scala:94) at org.apache.spark.sql.execution.datasources.DataSource.write(DataSource.scala:442) at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:211) at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:194) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:237) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) at py4j.Gateway.invoke(Gateway.java:280) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.GatewayConnection.run(GatewayConnection.java:214) at java.lang.Thread.run(Thread.java:745)

如果我尝试使用 Elasticsearch python 客户端提前创建索引,我会得到:

RequestError: TransportError(400, u'invalid_index_name_exception', u'Invalid index name [index_name-{ts_col:YYYY.MM.dd}], must be lowercase')

您不需要再次将日期格式放在花括号中。你可以阅读更多 here

.option('es.resource', 'index_name-{ts_col:{YYYY.mm.dd}}/type_name')

把上面的改成如下图:

.option('es.resource', 'index_name-{ts_col:YYYY.mm.dd}/type_name')

注意:确保您的ts_col字段具有正确的日期格式。