ElasticSearch 将嵌套字段重新索引为新文档
ElasticSearch reindex nested field as new documents
我目前正在更改我的 ElasticSearch 架构。
我以前在我的索引中有一个类型 Product
,带有一个嵌套字段 Product.users
。
我现在想要获得 2 个不同的索引,一个用于 Product
,另一个用于 User
,并在代码中建立两者之间的链接。
我使用 reindex
API 将我所有的 Product
文档重新索引到新索引,使用脚本删除 Product.users
字段:
ctx._source.remove('users');
但我不知道如何将我所有的 Product.users
文档重新索引到新的 User
索引,就像在脚本中一样,我会得到一个 users
的 ArrayList,我想为每个创建一个 User
文档。
有人知道如何实现吗?
对于可能遇到这种情况的人,我最终使用 scroll
和 bulk
APIs 重新索引 users
嵌套字段。
- 我使用
scroll
API 获取了 Product
批文档
- 对于每个批次迭代这些
Product
个文档
- 对每个文档迭代
Product.users
- 创建新的
User
文档并将其添加到批量
- 当我结束迭代
Product
批次时发送批量
工作 <3
您需要的是 ETL(提取、转换、加载)。
大多数时候,编写一个小 python 脚本来完全满足您的需求会更方便,但是对于 elasticsearch,有一个我喜欢的插件:Apache Spark + elasticsearch4hadoop 插件。
此外,有时 logstash 可以解决问题,但使用 Spark 时您有:
- SQL语法或支持Java/Scala/Python代码
- read/write elasticsearch 非常快,因为分布式 worker(1 个 ES 分片 = 1 个 Spark worker)
- 容错(工人崩溃?没问题)
- 集群(如果您有数十亿个文档,这是理想的选择)
与 Apache Zeppelin(已打包并准备好 Spark 的笔记本)一起使用,您会爱上它的!
我能想到的最简单的解决方案是 运行 两次重新索引命令。选择产品字段并重新索引到新产品索引后,用户一次:
POST _reindex
{
"source": {
"index": "Product",
"type": "_doc",
"_source": ["fields", "to keep in", "new Products"]
"query": {
"match_all": {}
}
},
"dest": {
"index": "new_Products"
}
}
然后您应该可以在 new_User table 上再次进行重新索引,方法是仅在第二次重新索引
中选择 Product.users
我目前正在更改我的 ElasticSearch 架构。
我以前在我的索引中有一个类型 Product
,带有一个嵌套字段 Product.users
。
我现在想要获得 2 个不同的索引,一个用于 Product
,另一个用于 User
,并在代码中建立两者之间的链接。
我使用 reindex
API 将我所有的 Product
文档重新索引到新索引,使用脚本删除 Product.users
字段:
ctx._source.remove('users');
但我不知道如何将我所有的 Product.users
文档重新索引到新的 User
索引,就像在脚本中一样,我会得到一个 users
的 ArrayList,我想为每个创建一个 User
文档。
有人知道如何实现吗?
对于可能遇到这种情况的人,我最终使用 scroll
和 bulk
APIs 重新索引 users
嵌套字段。
- 我使用
scroll
API 获取了Product
批文档 - 对于每个批次迭代这些
Product
个文档 - 对每个文档迭代
Product.users
- 创建新的
User
文档并将其添加到批量 - 当我结束迭代
Product
批次时发送批量
工作 <3
您需要的是 ETL(提取、转换、加载)。
大多数时候,编写一个小 python 脚本来完全满足您的需求会更方便,但是对于 elasticsearch,有一个我喜欢的插件:Apache Spark + elasticsearch4hadoop 插件。
此外,有时 logstash 可以解决问题,但使用 Spark 时您有:
- SQL语法或支持Java/Scala/Python代码
- read/write elasticsearch 非常快,因为分布式 worker(1 个 ES 分片 = 1 个 Spark worker)
- 容错(工人崩溃?没问题)
- 集群(如果您有数十亿个文档,这是理想的选择)
与 Apache Zeppelin(已打包并准备好 Spark 的笔记本)一起使用,您会爱上它的!
我能想到的最简单的解决方案是 运行 两次重新索引命令。选择产品字段并重新索引到新产品索引后,用户一次:
POST _reindex
{
"source": {
"index": "Product",
"type": "_doc",
"_source": ["fields", "to keep in", "new Products"]
"query": {
"match_all": {}
}
},
"dest": {
"index": "new_Products"
}
}
然后您应该可以在 new_User table 上再次进行重新索引,方法是仅在第二次重新索引
中选择 Product.users