Logstash Java OutOfMemory only when use_prepared_statements=>true in jdbc when synchronizing initial huge dataset
Logstash Java OutOfMemory only when use_prepared_statements=>true in jdbc when synchronizing initial huge dataset
在使用 Logstash 和 jdbc 将约 70 条 Mio 记录的庞大数据集初始加载到 Elastic 期间,我们得到 Java OutOfMemory
。
Java Heapsize 是 4GB,但即使是 8GB 或 16GB,情况也是一样的。
无需 use_prepared_statements
(使用文字)即可成功加载完全相同的数据集。
我们已经使用 use_prepared_statements=>false
成功加载了一次整个视图,但为了即使对于增量加载也能更高效,我们希望使用准备好的语句。
我们的想法是使用 jdbc_fetch_size=>1000
来限制一次获取的数据量。
根据文档,使用 jdbc_page_size
或 jdbc_paging_enabled
在使用 prepared_statements 时无效。由于后台的复杂视图,它甚至效率不高,当多个查询每次都使用偏移量进行初始加载时效率不高。
在 Oracle 中使用准备好的语句效果很好,我们看到当 Logstash 因 OutOfMemory 失败而失败时,从大约 70 mio 中提取大约 1 mio 记录。
我们当前的 logstash 配置:
input {
jdbc {
type => "sim_list"
jdbc_validate_connection => true
jdbc_driver_library => "/pkg/moip/otc/apps/ls-otc-tmsp_tua2/conf/ojdbc6.jar"
jdbc_driver_class => "Java::oracle.jdbc.driver.OracleDriver"
jdbc_connection_string => "jdbc:oracle:thin:@db-server:1521:TA2A"
jdbc_user => "test"
jdbc_password => "testpwd"
tracking_column => "last_import"
use_column_value => true
tracking_column_type => "timestamp"
prepared_statement_bind_values => [":sql_last_value"]
prepared_statement_name => "get_sim_list_tua2_v5"
use_prepared_statements => true
last_run_metadata_path => "/pkg/moip/otc/data/ls-otc-tmsp_tua2/jdbc/sql_last_run_sim_list_tua2_v5"
statement => "SELECT simlist.*, simlist.LAST_UPDATE as last_import
FROM (SELECT *
FROM V_SIM_LIST_VIEW
WHERE last_update between (?) - 1/86400 and sysdate
ORDER BY last_update ASC) simlist"
# run every 5 seconds
schedule => "*/5 * * * * *"
connection_retry_attempts => "3"
jdbc_fetch_size => 1000
}
}
filter {
mutate {
copy => {
"%{di_id}" => "[@metadata][_id]"
}
split => { "user_ids" => "," }
split => { "option_ids" => "|" }
split => {"apn_ids" => "|"}
split => { "ip_address" => "|" }
}
}
output {
if [type] == "sim_list" {
elasticsearch {
hosts => ["https://ece.mydomain.com:443"]
index => "sim_list_tua2_v5"
document_id => "%{di_id}"
#doc_as_upsert => true
manage_template => false
cacert => "/pkg/moip/otc/apps/ls-otc-tmsp_tua2/conf/certs/ca.crt"
ssl => true
ssl_certificate_verification => true
user => "logstash_write"
password => "${LSWRITEPWD}"
}
}
}
想法是,使用一个大查询进行初始加载,应该按 jdbc_fetch_size
拆分。
下一个时间表应使用 :sql_last_value
来获得增量更改。
但是,它在第一轮大约 1000 次提取后很快就坏了。
目前看来,jdbc_fetch_size
没有按预期工作。所以我们不知道为什么它总是失败。
我们针对 Oracle 19c 使用 ojdbc6.jar 和 odbc10.jar 进行了测试,但没有区别。
Logstash 是 运行 Java 11.0.10.
也许可以尝试在准备语句使用的查询中使用 fetch,limit and offset。比如,我们在 SQL 级别而不是 JDBC 级别进行批处理。在要加载的每个批次中发送偏移量。我不是 Logstash 专家,但这应该是可配置的。
我们发现以下很好的解决方法:
使用具有两种不同配置的 logstash(无论如何都需要):
- 对于初始同步:使用文字
use_prepared_statements=>false
- 对于增量同步:使用
use_prepared_statements=>true
此外,由于无论如何我们都必须使用两个不同的 logstash 配置,因此我们对 Initial-Sync 的 SQL 使用了优化器提示,而没有对 Delta-Sync 的 SQL 使用这些提示.因此,我们对这两种情况都有最佳配置。
在使用 Logstash 和 jdbc 将约 70 条 Mio 记录的庞大数据集初始加载到 Elastic 期间,我们得到 Java OutOfMemory
。
Java Heapsize 是 4GB,但即使是 8GB 或 16GB,情况也是一样的。
无需 use_prepared_statements
(使用文字)即可成功加载完全相同的数据集。
我们已经使用 use_prepared_statements=>false
成功加载了一次整个视图,但为了即使对于增量加载也能更高效,我们希望使用准备好的语句。
我们的想法是使用 jdbc_fetch_size=>1000
来限制一次获取的数据量。
根据文档,使用 jdbc_page_size
或 jdbc_paging_enabled
在使用 prepared_statements 时无效。由于后台的复杂视图,它甚至效率不高,当多个查询每次都使用偏移量进行初始加载时效率不高。
在 Oracle 中使用准备好的语句效果很好,我们看到当 Logstash 因 OutOfMemory 失败而失败时,从大约 70 mio 中提取大约 1 mio 记录。
我们当前的 logstash 配置:
input {
jdbc {
type => "sim_list"
jdbc_validate_connection => true
jdbc_driver_library => "/pkg/moip/otc/apps/ls-otc-tmsp_tua2/conf/ojdbc6.jar"
jdbc_driver_class => "Java::oracle.jdbc.driver.OracleDriver"
jdbc_connection_string => "jdbc:oracle:thin:@db-server:1521:TA2A"
jdbc_user => "test"
jdbc_password => "testpwd"
tracking_column => "last_import"
use_column_value => true
tracking_column_type => "timestamp"
prepared_statement_bind_values => [":sql_last_value"]
prepared_statement_name => "get_sim_list_tua2_v5"
use_prepared_statements => true
last_run_metadata_path => "/pkg/moip/otc/data/ls-otc-tmsp_tua2/jdbc/sql_last_run_sim_list_tua2_v5"
statement => "SELECT simlist.*, simlist.LAST_UPDATE as last_import
FROM (SELECT *
FROM V_SIM_LIST_VIEW
WHERE last_update between (?) - 1/86400 and sysdate
ORDER BY last_update ASC) simlist"
# run every 5 seconds
schedule => "*/5 * * * * *"
connection_retry_attempts => "3"
jdbc_fetch_size => 1000
}
}
filter {
mutate {
copy => {
"%{di_id}" => "[@metadata][_id]"
}
split => { "user_ids" => "," }
split => { "option_ids" => "|" }
split => {"apn_ids" => "|"}
split => { "ip_address" => "|" }
}
}
output {
if [type] == "sim_list" {
elasticsearch {
hosts => ["https://ece.mydomain.com:443"]
index => "sim_list_tua2_v5"
document_id => "%{di_id}"
#doc_as_upsert => true
manage_template => false
cacert => "/pkg/moip/otc/apps/ls-otc-tmsp_tua2/conf/certs/ca.crt"
ssl => true
ssl_certificate_verification => true
user => "logstash_write"
password => "${LSWRITEPWD}"
}
}
}
想法是,使用一个大查询进行初始加载,应该按 jdbc_fetch_size
拆分。
下一个时间表应使用 :sql_last_value
来获得增量更改。
但是,它在第一轮大约 1000 次提取后很快就坏了。
目前看来,jdbc_fetch_size
没有按预期工作。所以我们不知道为什么它总是失败。
我们针对 Oracle 19c 使用 ojdbc6.jar 和 odbc10.jar 进行了测试,但没有区别。 Logstash 是 运行 Java 11.0.10.
也许可以尝试在准备语句使用的查询中使用 fetch,limit and offset。比如,我们在 SQL 级别而不是 JDBC 级别进行批处理。在要加载的每个批次中发送偏移量。我不是 Logstash 专家,但这应该是可配置的。
我们发现以下很好的解决方法:
使用具有两种不同配置的 logstash(无论如何都需要):
- 对于初始同步:使用文字
use_prepared_statements=>false
- 对于增量同步:使用
use_prepared_statements=>true
此外,由于无论如何我们都必须使用两个不同的 logstash 配置,因此我们对 Initial-Sync 的 SQL 使用了优化器提示,而没有对 Delta-Sync 的 SQL 使用这些提示.因此,我们对这两种情况都有最佳配置。