将 1GB 文件的内容流式传输到单列下的 sqlite table

stream contents of 1GB file to sqlite table under single column

下面的实现给出了 1 GB 大小的文件和 4 GB 堆的内存不足错误 space。 Files.lines 会 return 一个流,但是 运行 Collectors.joining 它会给出堆错误。

我们可以使用 jooq 和 jdbc 保留原始行分隔符并以较小的内存占用流式传输文件吗?

Stream<String> lines = Files.lines(path);

dsl.createTable(TABLE1)
            .column(COL1, SQLDataType.CLOB)
            .column(COL2, SQLDataType.CLOB)
            .execute();

dsl.insertInto(TABLE1)
                .columns(COL1, COL2)
                .values(VAL1, lines
                        .collect(Collectors.joining(System.lineSeparator())))
                .execute();

错误 ->

java.lang.OutOfMemoryError: Java heap space
at java.util.Arrays.copyOf(Arrays.java:3332) ~[na:1.8.0_141]
at java.lang.AbstractStringBuilder.ensureCapacityInternal(AbstractStringBuilder.java:124) ~[na:1.8.0_141]
at java.lang.AbstractStringBuilder.append(AbstractStringBuilder.java:448) ~[na:1.8.0_141]
at java.lang.StringBuilder.append(StringBuilder.java:136) ~[na:1.8.0_141]
at java.lang.StringBuilder.append(StringBuilder.java:76) ~[na:1.8.0_141]
at java.lang.AbstractStringBuilder.append(AbstractStringBuilder.java:484) ~[na:1.8.0_141]
at java.lang.StringBuilder.append(StringBuilder.java:166) ~[na:1.8.0_141]
at java.util.StringJoiner.add(StringJoiner.java:185) ~[na:1.8.0_141]
at java.util.stream.Collectors$$Lambda1/1699129225.accept(Unknown Source) ~[na:na]
at java.util.stream.ReduceOpsReducingSink.accept(ReduceOps.java:169) ~[na:1.8.0_141]
at java.util.Iterator.forEachRemaining(Iterator.java:116) ~[na:1.8.0_141]
at java.util.Spliterators$IteratorSpliterator.forEachRemaining(Spliterators.java:1801) ~[na:1.8.0_141]
at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481) ~[na:1.8.0_141]
at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471) ~[na:1.8.0_141]
at java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708) ~[na:1.8.0_141]
at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) ~[na:1.8.0_141]
at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:499) ~[na:1.8.0_141]

jOOQ 对 CLOB 数据类型的默认数据类型绑定将 CLOB 类型视为普通 String 类型,这适用于中小型 lob。对于较大的 lob,JDBC API 的流式版本会更合适。理想情况下,您将创建自己的数据类型绑定,在其中优化流式传输的写入操作: https://www.jooq.org/doc/latest/manual/code-generation/custom-data-type-bindings

例如:

class StreamingLobBinding implements Binding<String, File> {
    ...
    @Override
    public void set(BindingSetStatementContext<File> ctx) {
        // Ideally: register the input stream somewhere for explicit resource management
        ctx.statement()
           .setBinaryStream(ctx.index(), new FileInputStream(ctx.value()));
    }
}

然后,您可以将此绑定应用到您的列并让代码生成器按照上述 link 中的说明提取它,或者您仅将其应用于单次使用:

DataType<File> fileType = COL2.getDataType().asConvertedDataType(new StreamingLobBinding());
Field<File> fileCol = DSL.field(COL2.getName(), fileType);
dsl.insertInto(TABLE1)
   .columns(COL1, fileCol)
   .values(VAL1, DSL.val(theFile, fileType))
   .execute();

请注意,目前,您可能需要在某些 ThreadLocal 中注册您的输入流以记住它并在语句执行后清理它。未来的 jOOQ 版本将提供 SPI 来处理:https://github.com/jOOQ/jOOQ/issues/7591

由于 sqlite-jdbc 使用 setBinaryStream 抛出 SQLFeatureNotSupportedException,还有一种方法如上面 github 问题中所建议的。

dsl.insertInto(TABLE1)
        .columns(TABLE1.COL1, TABLE1.COL2)
        .values("ABB", null)
        .execute();

BufferedReader reader = new BufferedReader(new InputStreamReader(new FileInputStream(file), StandardCharsets.UTF_8));
StringBuilder lines = new StringBuilder();
int bytesCount = 0;
String line = null;
int value = 0;
Field<String> coalesce = DSL.coalesce(TABLE1.COL2, "");
char[] buffer = new char[100 * 1000 * 1000];

while ((value = reader.read(buffer)) != -1) {
    lines.append(String.valueOf(buffer, 0, value));
    bytesCount += lines.length();
    if (bytesCount >= 100 * 1000 * 1000) {

        dsl.update(TABLE1).set(TABLE1.COL2, DSL.concat(coalesce, lines.toString())).where(TABLE1.COL1.eq("ABB")).execute();
        bytesCount = 0;
        lines.setLength(0);
    }
}

if (lines.length() > 0) {
   dsl.update(TABLE1).set(TABLE1.COL2, DSL.concat(coalesce, lines.toString())).where(TABLE1.COL1.eq("ABB")).execute();
}
reader.close();