将 MySQL 值转换为建议字段上的嵌套弹性搜索 属性 时出现 Logstash 错误
Logstash error when converting MySQL value to nested elasticsearch property on suggestion field
这里有一个巨大的帮助请求,当我尝试使用 logstash 将 MySQL 值转换为嵌套的 elasticsearch 字段时,出现以下错误。
{"exception"=>"expecting List or Map, found class org.logstash.bivalues.StringBiValue", "backtrace"=>["org.logstash.Accessors.newCollectionException(Accessors.java:195)"
使用以下配置文件:
input {
jdbc {
jdbc_driver_library => "/logstash/mysql-connector-java-5.1.42-bin.jar"
jdbc_driver_class => "com.mysql.jdbc.Driver"
jdbc_connection_string => "jdbc:mysql://localhost:3306/data"
jdbc_user => "username"
jdbc_password => "password"
statement => "SELECT id, suggestions, address_count FROM `suggestions` WHERE id <= 100"
jdbc_paging_enabled => "true"
jdbc_page_size => "50000"
}
}
filter {
mutate {
rename => { 'address_count' => '[suggestions][payload][count]' }
}
}
output {
elasticsearch {
hosts => [
"localhost:9200"
]
index => "dev_suggestions"
document_type => "address"
}
}
但是,如果我将 address_count 重命名为一个尚未在我的映射中的字段,那么它工作得很好并且它正确地将值添加为嵌套 属性,我尝试了索引中的其他字段,而不仅仅是 suggestions.payloads.address_count,我遇到了同样的问题,只有在映射中未定义该字段时才有效。
这让我很头疼,如果有人能帮助我解决这个问题,我将不胜感激,因为我在过去的 48 小时里一直在努力 table!
我最初假设我可以使用 MySQL 查询执行以下操作:
SELECT id, suggestion, '[suggestions][payload][count]' FROM `suggestions` WHERE id <= 100
那我也试了
SELECT id, suggestion, 'suggestions.payload.count' FROM `suggestions` WHERE id <= 100
两者都无法使用后面的选项插入值,给出一个字段不能包含点的错误。
最后是映射:
{
"mappings": {
"address": {
"properties": {
"suggestions": {
"type": "completion",
"payloads" : true
}
}
}
}
}
感谢 Val - 对于未来与我情况相同的用户,需要使用 logstash 将 MySQL 数据转换为嵌套的 Elasticsearch 对象,这是一个使用 Logstash 5 和 Elasticsearch 2 的工作解决方案。*
input {
jdbc {
jdbc_driver_library => "/logstash/mysql-connector-java-5.1.42-bin.jar"
jdbc_driver_class => "com.mysql.jdbc.Driver"
jdbc_connection_string => "jdbc:mysql://localhost:3306/data"
jdbc_user => "username"
jdbc_password => "password"
statement => "SELECT addrid, suggestion, address_count FROM `suggestions` WHERE id <= 20"
jdbc_paging_enabled => "true"
jdbc_page_size => "50000"
}
}
filter {
ruby {
code => "
event.set('[suggestions][input]', event.get('suggestion'))
event.set('[suggestions][payload][address_count]', event.get('address_count'))
event.set('[v][payload][id]', event.get('addrid'))
"
remove_field => [ 'suggestion', 'address_count', 'addrid' ]
}
}
output {
elasticsearch {
hosts => [
"localhost:9200"
]
index => "dev_suggestions"
document_type => "address"
}
}
我认为您需要采取不同的方式。首先,我会将 SQL 查询中的 suggestions
字段重命名为其他名称,然后根据从 SQL 查询中获得的值构建 suggestions
对象。
statement => "SELECT id, suggestion, address_count FROM `suggestions` WHERE id <= 100"
然后您可以使用 ruby
过滤器(并删除您的 mutate
过滤器)来构建您的 suggestions
字段,如下所示:
Logstash 2.x 代码:
ruby {
code => "
event['suggestions']['input'] = event['suggestion']
event['suggestions']['payload']['count'] = event['address_count']
"
remove_field => [ 'suggestion', 'address_count' ]
}
Logstash 5.x 代码:
ruby {
code => "
event.set('[suggestions][input]', event.get('suggestion'))
event.set('[suggestions][payload][count]', event.get('address_count'))
"
remove_field => [ 'suggestion', 'address_count' ]
}
PS:所有这些假设您使用的是 ES 2.x,因为 payload
字段在 ES 5.x
中消失了
这里有一个巨大的帮助请求,当我尝试使用 logstash 将 MySQL 值转换为嵌套的 elasticsearch 字段时,出现以下错误。
{"exception"=>"expecting List or Map, found class org.logstash.bivalues.StringBiValue", "backtrace"=>["org.logstash.Accessors.newCollectionException(Accessors.java:195)"
使用以下配置文件:
input {
jdbc {
jdbc_driver_library => "/logstash/mysql-connector-java-5.1.42-bin.jar"
jdbc_driver_class => "com.mysql.jdbc.Driver"
jdbc_connection_string => "jdbc:mysql://localhost:3306/data"
jdbc_user => "username"
jdbc_password => "password"
statement => "SELECT id, suggestions, address_count FROM `suggestions` WHERE id <= 100"
jdbc_paging_enabled => "true"
jdbc_page_size => "50000"
}
}
filter {
mutate {
rename => { 'address_count' => '[suggestions][payload][count]' }
}
}
output {
elasticsearch {
hosts => [
"localhost:9200"
]
index => "dev_suggestions"
document_type => "address"
}
}
但是,如果我将 address_count 重命名为一个尚未在我的映射中的字段,那么它工作得很好并且它正确地将值添加为嵌套 属性,我尝试了索引中的其他字段,而不仅仅是 suggestions.payloads.address_count,我遇到了同样的问题,只有在映射中未定义该字段时才有效。
这让我很头疼,如果有人能帮助我解决这个问题,我将不胜感激,因为我在过去的 48 小时里一直在努力 table!
我最初假设我可以使用 MySQL 查询执行以下操作:
SELECT id, suggestion, '[suggestions][payload][count]' FROM `suggestions` WHERE id <= 100
那我也试了
SELECT id, suggestion, 'suggestions.payload.count' FROM `suggestions` WHERE id <= 100
两者都无法使用后面的选项插入值,给出一个字段不能包含点的错误。
最后是映射:
{
"mappings": {
"address": {
"properties": {
"suggestions": {
"type": "completion",
"payloads" : true
}
}
}
}
}
感谢 Val - 对于未来与我情况相同的用户,需要使用 logstash 将 MySQL 数据转换为嵌套的 Elasticsearch 对象,这是一个使用 Logstash 5 和 Elasticsearch 2 的工作解决方案。*
input {
jdbc {
jdbc_driver_library => "/logstash/mysql-connector-java-5.1.42-bin.jar"
jdbc_driver_class => "com.mysql.jdbc.Driver"
jdbc_connection_string => "jdbc:mysql://localhost:3306/data"
jdbc_user => "username"
jdbc_password => "password"
statement => "SELECT addrid, suggestion, address_count FROM `suggestions` WHERE id <= 20"
jdbc_paging_enabled => "true"
jdbc_page_size => "50000"
}
}
filter {
ruby {
code => "
event.set('[suggestions][input]', event.get('suggestion'))
event.set('[suggestions][payload][address_count]', event.get('address_count'))
event.set('[v][payload][id]', event.get('addrid'))
"
remove_field => [ 'suggestion', 'address_count', 'addrid' ]
}
}
output {
elasticsearch {
hosts => [
"localhost:9200"
]
index => "dev_suggestions"
document_type => "address"
}
}
我认为您需要采取不同的方式。首先,我会将 SQL 查询中的 suggestions
字段重命名为其他名称,然后根据从 SQL 查询中获得的值构建 suggestions
对象。
statement => "SELECT id, suggestion, address_count FROM `suggestions` WHERE id <= 100"
然后您可以使用 ruby
过滤器(并删除您的 mutate
过滤器)来构建您的 suggestions
字段,如下所示:
Logstash 2.x 代码:
ruby {
code => "
event['suggestions']['input'] = event['suggestion']
event['suggestions']['payload']['count'] = event['address_count']
"
remove_field => [ 'suggestion', 'address_count' ]
}
Logstash 5.x 代码:
ruby {
code => "
event.set('[suggestions][input]', event.get('suggestion'))
event.set('[suggestions][payload][count]', event.get('address_count'))
"
remove_field => [ 'suggestion', 'address_count' ]
}
PS:所有这些假设您使用的是 ES 2.x,因为 payload
字段在 ES 5.x