使用 Apache Camel 插入大型 CSV 文件时出现 GC 问题

GC problems when inserting large CSV file with Apache Camel

我有一个包含大约 520 万行的大型 CSV 文件。我想解析这个文件并将数据插入数据库。我正在为此使用 apache 骆驼。

这条路线相当简单(针对此示例进行了简化)

from("file:work/customer/").id("customerDataRoute")
.split(body().tokenize("\n")).streaming()
.parallelProcessing()
.unmarshal(bindyCustomer)
.split(body())
.process(new CustomerProcessor())
.to("sql:INSERT INTO CUSTOMER_DATA(`FIELD1`,`FIELD2`) VALUES(#,#)");

bindyCustomer 是 CSV 文件的 BindyCsvDataFormat,并且 CustomerProcessor 是一个 Processor,它 returns Bindy Customer 对象的数据作为 SQL 插入的对象数组。实际对象有39个字段(上面简化)。

对于前 800.000 到 1.000.000 行,这一切正常,但随后就停止了。

我用 JVisualVM 和 Visual GC 插件监控了 camel 实例,我可以看到老一代填满了,当它达到最大值时,整个系统就会停止,但不会崩溃。 此时老年代已经满了,伊甸园 space 几乎满了,两个幸存者 space 都是空的(因为我猜它不能把任何东西移动到老年代)。

所以这里出了什么问题?对我来说,这看起来像是 Camel SQL 组件中的内存泄漏。 数据主要存储在ConcurrentHashMap对象中。

当我取出 SQL 组件时,老年代几乎没有填充。

我正在使用 Camel 2.15.1 将尝试使用 2.17.1 以查看是否可以解决问题。

更新:我试过 Camel 2.17.1(同样的问题),我尝试使用 java.sql.Statement.executeUPdate 在 Java 中插入执行插入。使用此选项,我设法插入了大约 260 万行,但随后它也停止了。有趣的是我没有收到内存错误。它只是停止了。

我没有测试你的代码,但是,我确实注意到你的第二个拆分语句没有流式传输。我建议尝试一下。如果你有太多的并行工作流,GC 可能会在你释放资源之前填满,这会锁定你。 SQL 语句花费的时间可能是让 GC 获得太多构建时间的原因,因为您正在并行化主处理。

from("file:work/customer/").id("customerDataRoute")
    .split(body().tokenize("\n")).streaming().parallelProcessing()
        .unmarshal(bindyCustomer)
        .split(body()).streaming() //Add a streaming call here and see what happens
            .process(new CustomerProcessor())
            .to("sql:INSERT INTO CUSTOMER_DATA(`FIELD1`,`FIELD2`) VALUES(#,#)");

好的,我知道这里出了什么问题。与插入部分相比,基本上读取部分太快了。该示例有点过于简单,因为在读取和插入之间有一个 seda 队列(因为我必须对示例中未显示的内容进行选择)。 但即使没有 seda 队列,它也从未完成。杀camel的时候才发现不对,收到消息说还有几千条in-flight消息

所以当插入端跟不上时,并行处理读取是没有意义的。

from("file:work/customer/").id("customerDataRoute")
        .onCompletion().log("Customer data  processing finished").end()
        .log("Processing customer data ${file:name}")
        .split(body().tokenize("\n")).streaming() //no more parallel processing
        .choice()
            .when(simple("${body} contains 'HEADER TEXT'")) //strip out the header if it exists
            .log("Skipping first line")
            .endChoice()
        .otherwise()
            .to("seda:processCustomer?size=40&concurrentConsumers=20&blockWhenFull=true")
            .endChoice();


from("seda:processCustomer?size=40&concurrentConsumers=20&blockWhenFull=true")
            .unmarshal(bindyCustomer)
            .split(body())
            .process(new CustomerProcessor()).id("CustomProcessor") //converts one Notification into an array of values for the SQL insert
.to("sql:INSERT INTO CUSTOMER_DATA(`FIELD1`,`FIELD2`) VALUES(#,#)");

我在 SEDA 队列上定义了一个大小(默认情况下它是不受限制的)并且当队列满时调用线程阻塞。

seda:processCustomer?size=40&concurrentConsumers=20&blockWhenFull=true

并行处理是通过在 SEDA 队列上使用 20 个并发使用者完成的。请注意,无论出于何种原因,您在调用路由时也必须指定队列大小(不仅是在您定义它的地方)。

现在内存消耗最小,插入 500 万条记录没有问题。