如何更改非常大的 ElasticSearch 索引的映射类型?
How to change mapping type on a very large ElasticSearch index?
我想将 ElasticSearch 映射 属性 从 text
更改为 ip
,但它们是不兼容的类型。
我知道公认的答案是:
- 使用正确的映射创建新索引;
- 运行
_reindex
API把数据复制过来;
- 冲杯咖啡;
- 删除原索引;
- 创建一个从原始索引到新索引的别名。
但是,我正在处理 363GB 索引中的 50 亿条记录,每天可能有数十万条新记录被写入其中。上述过程在删除旧索引和创建新别名的过程中至少会涉及一些停机时间,并且在重建索引完成和下一步之间的时间内会丢失记录。 (重新编制索引的过程即使不是几天,也需要数小时。)就地进行转换会好得多,也许创建一个新字段并复制旧索引?有什么想法吗?
确实re-indexing这么大的索引不是一个好主意我建议创建一个新字段并使用摄取管道将其映射到新字段并在应用程序端进行更改以获得正确的数据类型。(您可以使用 update_byQuery 添加新字段)
另一种选择是使用运行时字段(在 ES 7.11 中引入),在这种情况下,您的解析将从索引时间切换到查询时间。
关于 re-indexing,您可以通过使用读写别名在 ES 中实现 zero-downtime 重建索引以获取更多信息,请查看 here。
实际上比那容易得多。过程如下:
一个。修改您的映射以将 ip
sub-field 添加到您现有的字段,例如
PUT index/_mapping
{
"properties": {
"your_ip_field": {
"type": "text",
"fields": {
"ip": {
"type": "ip"
}
}
}
}
}
乙。在您的索引
上调用index/_update_by_query?wait_for_completion=false&slices=auto
C。这里什么都没有,仅此而已。
对 _update_by_query
的调用将简单地获取每个 _source
文档并将其重新索引到自身(即新版本),然后 ip
sub-field 将可用于你的问题。
每天写入索引的所有新文档都将使用新映射,旧文档将在更新调用期间更新。不需要别名切换,也不需要重建索引。
更新完成后,您将能够引用 your_ip_field.ip
字段字段以使用它的 ip
版本。
这样做的缺点是您的倒排索引中仍然会有 your_ip_field
作为标记化文本,您可能不需要,但您不能全部拥有。有更复杂的解决方案可以使它正确,但这个很简单,可以让你开始。
感谢 @Kaveh 我使用了摄取 pipeline/update_by_query 解决方案。
我会将解决方案描述为任何正在寻找它的人的答案。
0。我正在将名为“user_ip”的关键字字段转换为“my_index”上名为“ip_addr”的 IP 字段。
您可以通过 Kibana 完成大部分工作,但我将使用 ElasticSearch Dev Tools Console 样式,因此如果您愿意,您可以运行 全部完成。
1.创建摄取管道
使用 Kibana 很容易做到这一点,as per these instructions。摄取管道名为 ip_transform
。如果你想 运行 它作为代码,它看起来如下:
PUT _ingest/pipeline/ip_transform
{
"processors": [
{
"convert": {
"field": "user_ip",
"type": "ip",
"target_field": "ip_addr",
"ignore_missing": true
}
}
]
}
2。在索引中创建字段
PUT my_index/_mapping
{
"properties": {
"ip_addr": {
"type": "ip"
}
}
}
3。将此设置为该索引的默认管道
这不是绝对必要的,您可以将 ?pipeline=ip_transform
添加到您的调用中,但我不想更改我的代码,我一直想 运行 这个管道。
PUT my_index/_settings
{
"index.default_pipeline": "ip_transform"
}
4。 运行 现有记录上的管道
我们使用 _update_by_query API 到 运行 管道对付每个现有记录。
POST my_index/_update_by_query?pipeline=ip_transform&wait_for_completion=false
{
"query": {
"bool": {
"must_not": {
"exists": {
"field": "ip_addr"
}
}
}
}
}
这对于大数据集来说会花费一些时间,但是您可以获取返回的 ID 并查询它是否完整:
GET _tasks/<Job ID>
我想将 ElasticSearch 映射 属性 从 text
更改为 ip
,但它们是不兼容的类型。
我知道公认的答案是:
- 使用正确的映射创建新索引;
- 运行
_reindex
API把数据复制过来; - 冲杯咖啡;
- 删除原索引;
- 创建一个从原始索引到新索引的别名。
但是,我正在处理 363GB 索引中的 50 亿条记录,每天可能有数十万条新记录被写入其中。上述过程在删除旧索引和创建新别名的过程中至少会涉及一些停机时间,并且在重建索引完成和下一步之间的时间内会丢失记录。 (重新编制索引的过程即使不是几天,也需要数小时。)就地进行转换会好得多,也许创建一个新字段并复制旧索引?有什么想法吗?
确实re-indexing这么大的索引不是一个好主意我建议创建一个新字段并使用摄取管道将其映射到新字段并在应用程序端进行更改以获得正确的数据类型。(您可以使用 update_byQuery 添加新字段)
另一种选择是使用运行时字段(在 ES 7.11 中引入),在这种情况下,您的解析将从索引时间切换到查询时间。
关于 re-indexing,您可以通过使用读写别名在 ES 中实现 zero-downtime 重建索引以获取更多信息,请查看 here。
实际上比那容易得多。过程如下:
一个。修改您的映射以将 ip
sub-field 添加到您现有的字段,例如
PUT index/_mapping
{
"properties": {
"your_ip_field": {
"type": "text",
"fields": {
"ip": {
"type": "ip"
}
}
}
}
}
乙。在您的索引
上调用index/_update_by_query?wait_for_completion=false&slices=auto
C。这里什么都没有,仅此而已。
对 _update_by_query
的调用将简单地获取每个 _source
文档并将其重新索引到自身(即新版本),然后 ip
sub-field 将可用于你的问题。
每天写入索引的所有新文档都将使用新映射,旧文档将在更新调用期间更新。不需要别名切换,也不需要重建索引。
更新完成后,您将能够引用 your_ip_field.ip
字段字段以使用它的 ip
版本。
这样做的缺点是您的倒排索引中仍然会有 your_ip_field
作为标记化文本,您可能不需要,但您不能全部拥有。有更复杂的解决方案可以使它正确,但这个很简单,可以让你开始。
感谢 @Kaveh 我使用了摄取 pipeline/update_by_query 解决方案。
我会将解决方案描述为任何正在寻找它的人的答案。
0。我正在将名为“user_ip”的关键字字段转换为“my_index”上名为“ip_addr”的 IP 字段。
您可以通过 Kibana 完成大部分工作,但我将使用 ElasticSearch Dev Tools Console 样式,因此如果您愿意,您可以运行 全部完成。
1.创建摄取管道
使用 Kibana 很容易做到这一点,as per these instructions。摄取管道名为 ip_transform
。如果你想 运行 它作为代码,它看起来如下:
PUT _ingest/pipeline/ip_transform
{
"processors": [
{
"convert": {
"field": "user_ip",
"type": "ip",
"target_field": "ip_addr",
"ignore_missing": true
}
}
]
}
2。在索引中创建字段
PUT my_index/_mapping
{
"properties": {
"ip_addr": {
"type": "ip"
}
}
}
3。将此设置为该索引的默认管道
这不是绝对必要的,您可以将 ?pipeline=ip_transform
添加到您的调用中,但我不想更改我的代码,我一直想 运行 这个管道。
PUT my_index/_settings
{
"index.default_pipeline": "ip_transform"
}
4。 运行 现有记录上的管道
我们使用 _update_by_query API 到 运行 管道对付每个现有记录。
POST my_index/_update_by_query?pipeline=ip_transform&wait_for_completion=false
{
"query": {
"bool": {
"must_not": {
"exists": {
"field": "ip_addr"
}
}
}
}
}
这对于大数据集来说会花费一些时间,但是您可以获取返回的 ID 并查询它是否完整:
GET _tasks/<Job ID>