Logstash Java OutOfMemory only when use_prepared_statements=>true in jdbc when synchronizing initial huge dataset

Logstash Java OutOfMemory only when use_prepared_statements=>true in jdbc when synchronizing initial huge dataset

在使用 Logstash 和 jdbc 将约 70 条 Mio 记录的庞大数据集初始加载到 Elastic 期间,我们得到 Java OutOfMemory。 Java Heapsize 是 4GB,但即使是 8GB 或 16GB,情况也是一样的。

无需 use_prepared_statements(使用文字)即可成功加载完全相同的数据集。

我们已经使用 use_prepared_statements=>false 成功加载了一次整个视图,但为了即使对于增量加载也能更高效,我们希望使用准备好的语句。 我们的想法是使用 jdbc_fetch_size=>1000 来限制一次获取的数据量。 根据文档,使用 jdbc_page_sizejdbc_paging_enabled 在使用 prepared_statements 时无效。由于后台的复杂视图,它甚至效率不高,当多个查询每次都使用偏移量进行初始加载时效率不高。

在 Oracle 中使用准备好的语句效果很好,我们看到当 Logstash 因 OutOfMemory 失败而失败时,从大约 70 mio 中提取大约 1 mio 记录。

我们当前的 logstash 配置:

input {
    jdbc {
        type => "sim_list"
        jdbc_validate_connection => true
        jdbc_driver_library => "/pkg/moip/otc/apps/ls-otc-tmsp_tua2/conf/ojdbc6.jar"
        jdbc_driver_class => "Java::oracle.jdbc.driver.OracleDriver"
        jdbc_connection_string => "jdbc:oracle:thin:@db-server:1521:TA2A"
        jdbc_user => "test"
        jdbc_password => "testpwd"
        tracking_column => "last_import"
        use_column_value => true
        tracking_column_type => "timestamp"
        prepared_statement_bind_values => [":sql_last_value"]
        prepared_statement_name => "get_sim_list_tua2_v5"
        use_prepared_statements => true
        last_run_metadata_path => "/pkg/moip/otc/data/ls-otc-tmsp_tua2/jdbc/sql_last_run_sim_list_tua2_v5"
        statement => "SELECT simlist.*, simlist.LAST_UPDATE as last_import
FROM (SELECT *
      FROM V_SIM_LIST_VIEW
        WHERE last_update between (?) - 1/86400 and sysdate
          ORDER BY last_update ASC) simlist"
        # run every 5 seconds
        schedule => "*/5 * * * * *"
        connection_retry_attempts => "3"
        jdbc_fetch_size => 1000
    }
}

filter {
    mutate {
        copy => {
            "%{di_id}" => "[@metadata][_id]"
        }
        split => { "user_ids" => "," }
        split => { "option_ids" => "|" }
        split => {"apn_ids" => "|"}
        split => { "ip_address" => "|" }
    }
}

output {
    if [type] == "sim_list" {
        elasticsearch {
            hosts => ["https://ece.mydomain.com:443"]
            index => "sim_list_tua2_v5"
            document_id => "%{di_id}"
            #doc_as_upsert => true
            manage_template => false
            cacert => "/pkg/moip/otc/apps/ls-otc-tmsp_tua2/conf/certs/ca.crt"
            ssl => true
            ssl_certificate_verification => true
            user => "logstash_write"
            password => "${LSWRITEPWD}"
        }
    }
}

想法是,使用一个大查询进行初始加载,应该按 jdbc_fetch_size 拆分。 下一个时间表应使用 :sql_last_value 来获得增量更改。 但是,它在第​​一轮大约 1000 次提取后很快就坏了。

目前看来,jdbc_fetch_size 没有按预期工作。所以我们不知道为什么它总是失败。

我们针对 Oracle 19c 使用 ojdbc6.jar 和 odbc10.jar 进行了测试,但没有区别。 Logstash 是 运行 Java 11.0.10.

也许可以尝试在准备语句使用的查询中使用 fetch,limit and offset。比如,我们在 SQL 级别而不是 JDBC 级别进行批处理。在要加载的每个批次中发送偏移量。我不是 Logstash 专家,但这应该是可配置的。

我们发现以下很好的解决方法:

使用具有两种不同配置的 logstash(无论如何都需要):

  1. 对于初始同步:使用文字 use_prepared_statements=>false
  2. 对于增量同步:使用 use_prepared_statements=>true

此外,由于无论如何我们都必须使用两个不同的 logstash 配置,因此我们对 Initial-Sync 的 SQL 使用了优化器提示,而没有对 Delta-Sync 的 SQL 使用这些提示.因此,我们对这两种情况都有最佳配置。