将 1GB 文件的内容流式传输到单列下的 sqlite table
stream contents of 1GB file to sqlite table under single column
下面的实现给出了 1 GB 大小的文件和 4 GB 堆的内存不足错误 space。 Files.lines 会 return 一个流,但是 运行 Collectors.joining 它会给出堆错误。
我们可以使用 jooq 和 jdbc 保留原始行分隔符并以较小的内存占用流式传输文件吗?
Stream<String> lines = Files.lines(path);
dsl.createTable(TABLE1)
.column(COL1, SQLDataType.CLOB)
.column(COL2, SQLDataType.CLOB)
.execute();
dsl.insertInto(TABLE1)
.columns(COL1, COL2)
.values(VAL1, lines
.collect(Collectors.joining(System.lineSeparator())))
.execute();
错误 ->
java.lang.OutOfMemoryError: Java heap space
at java.util.Arrays.copyOf(Arrays.java:3332) ~[na:1.8.0_141]
at java.lang.AbstractStringBuilder.ensureCapacityInternal(AbstractStringBuilder.java:124) ~[na:1.8.0_141]
at java.lang.AbstractStringBuilder.append(AbstractStringBuilder.java:448) ~[na:1.8.0_141]
at java.lang.StringBuilder.append(StringBuilder.java:136) ~[na:1.8.0_141]
at java.lang.StringBuilder.append(StringBuilder.java:76) ~[na:1.8.0_141]
at java.lang.AbstractStringBuilder.append(AbstractStringBuilder.java:484) ~[na:1.8.0_141]
at java.lang.StringBuilder.append(StringBuilder.java:166) ~[na:1.8.0_141]
at java.util.StringJoiner.add(StringJoiner.java:185) ~[na:1.8.0_141]
at java.util.stream.Collectors$$Lambda1/1699129225.accept(Unknown Source) ~[na:na]
at java.util.stream.ReduceOpsReducingSink.accept(ReduceOps.java:169) ~[na:1.8.0_141]
at java.util.Iterator.forEachRemaining(Iterator.java:116) ~[na:1.8.0_141]
at java.util.Spliterators$IteratorSpliterator.forEachRemaining(Spliterators.java:1801) ~[na:1.8.0_141]
at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481) ~[na:1.8.0_141]
at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471) ~[na:1.8.0_141]
at java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708) ~[na:1.8.0_141]
at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) ~[na:1.8.0_141]
at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:499) ~[na:1.8.0_141]
jOOQ 对 CLOB
数据类型的默认数据类型绑定将 CLOB
类型视为普通 String
类型,这适用于中小型 lob。对于较大的 lob,JDBC API 的流式版本会更合适。理想情况下,您将创建自己的数据类型绑定,在其中优化流式传输的写入操作:
https://www.jooq.org/doc/latest/manual/code-generation/custom-data-type-bindings
例如:
class StreamingLobBinding implements Binding<String, File> {
...
@Override
public void set(BindingSetStatementContext<File> ctx) {
// Ideally: register the input stream somewhere for explicit resource management
ctx.statement()
.setBinaryStream(ctx.index(), new FileInputStream(ctx.value()));
}
}
然后,您可以将此绑定应用到您的列并让代码生成器按照上述 link 中的说明提取它,或者您仅将其应用于单次使用:
DataType<File> fileType = COL2.getDataType().asConvertedDataType(new StreamingLobBinding());
Field<File> fileCol = DSL.field(COL2.getName(), fileType);
dsl.insertInto(TABLE1)
.columns(COL1, fileCol)
.values(VAL1, DSL.val(theFile, fileType))
.execute();
请注意,目前,您可能需要在某些 ThreadLocal
中注册您的输入流以记住它并在语句执行后清理它。未来的 jOOQ 版本将提供 SPI 来处理:https://github.com/jOOQ/jOOQ/issues/7591
由于 sqlite-jdbc
使用 setBinaryStream
抛出 SQLFeatureNotSupportedException
,还有一种方法如上面 github 问题中所建议的。
dsl.insertInto(TABLE1)
.columns(TABLE1.COL1, TABLE1.COL2)
.values("ABB", null)
.execute();
BufferedReader reader = new BufferedReader(new InputStreamReader(new FileInputStream(file), StandardCharsets.UTF_8));
StringBuilder lines = new StringBuilder();
int bytesCount = 0;
String line = null;
int value = 0;
Field<String> coalesce = DSL.coalesce(TABLE1.COL2, "");
char[] buffer = new char[100 * 1000 * 1000];
while ((value = reader.read(buffer)) != -1) {
lines.append(String.valueOf(buffer, 0, value));
bytesCount += lines.length();
if (bytesCount >= 100 * 1000 * 1000) {
dsl.update(TABLE1).set(TABLE1.COL2, DSL.concat(coalesce, lines.toString())).where(TABLE1.COL1.eq("ABB")).execute();
bytesCount = 0;
lines.setLength(0);
}
}
if (lines.length() > 0) {
dsl.update(TABLE1).set(TABLE1.COL2, DSL.concat(coalesce, lines.toString())).where(TABLE1.COL1.eq("ABB")).execute();
}
reader.close();
下面的实现给出了 1 GB 大小的文件和 4 GB 堆的内存不足错误 space。 Files.lines 会 return 一个流,但是 运行 Collectors.joining 它会给出堆错误。
我们可以使用 jooq 和 jdbc 保留原始行分隔符并以较小的内存占用流式传输文件吗?
Stream<String> lines = Files.lines(path);
dsl.createTable(TABLE1)
.column(COL1, SQLDataType.CLOB)
.column(COL2, SQLDataType.CLOB)
.execute();
dsl.insertInto(TABLE1)
.columns(COL1, COL2)
.values(VAL1, lines
.collect(Collectors.joining(System.lineSeparator())))
.execute();
错误 ->
java.lang.OutOfMemoryError: Java heap space
at java.util.Arrays.copyOf(Arrays.java:3332) ~[na:1.8.0_141]
at java.lang.AbstractStringBuilder.ensureCapacityInternal(AbstractStringBuilder.java:124) ~[na:1.8.0_141]
at java.lang.AbstractStringBuilder.append(AbstractStringBuilder.java:448) ~[na:1.8.0_141]
at java.lang.StringBuilder.append(StringBuilder.java:136) ~[na:1.8.0_141]
at java.lang.StringBuilder.append(StringBuilder.java:76) ~[na:1.8.0_141]
at java.lang.AbstractStringBuilder.append(AbstractStringBuilder.java:484) ~[na:1.8.0_141]
at java.lang.StringBuilder.append(StringBuilder.java:166) ~[na:1.8.0_141]
at java.util.StringJoiner.add(StringJoiner.java:185) ~[na:1.8.0_141]
at java.util.stream.Collectors$$Lambda1/1699129225.accept(Unknown Source) ~[na:na]
at java.util.stream.ReduceOpsReducingSink.accept(ReduceOps.java:169) ~[na:1.8.0_141]
at java.util.Iterator.forEachRemaining(Iterator.java:116) ~[na:1.8.0_141]
at java.util.Spliterators$IteratorSpliterator.forEachRemaining(Spliterators.java:1801) ~[na:1.8.0_141]
at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481) ~[na:1.8.0_141]
at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471) ~[na:1.8.0_141]
at java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708) ~[na:1.8.0_141]
at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) ~[na:1.8.0_141]
at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:499) ~[na:1.8.0_141]
jOOQ 对 CLOB
数据类型的默认数据类型绑定将 CLOB
类型视为普通 String
类型,这适用于中小型 lob。对于较大的 lob,JDBC API 的流式版本会更合适。理想情况下,您将创建自己的数据类型绑定,在其中优化流式传输的写入操作:
https://www.jooq.org/doc/latest/manual/code-generation/custom-data-type-bindings
例如:
class StreamingLobBinding implements Binding<String, File> {
...
@Override
public void set(BindingSetStatementContext<File> ctx) {
// Ideally: register the input stream somewhere for explicit resource management
ctx.statement()
.setBinaryStream(ctx.index(), new FileInputStream(ctx.value()));
}
}
然后,您可以将此绑定应用到您的列并让代码生成器按照上述 link 中的说明提取它,或者您仅将其应用于单次使用:
DataType<File> fileType = COL2.getDataType().asConvertedDataType(new StreamingLobBinding());
Field<File> fileCol = DSL.field(COL2.getName(), fileType);
dsl.insertInto(TABLE1)
.columns(COL1, fileCol)
.values(VAL1, DSL.val(theFile, fileType))
.execute();
请注意,目前,您可能需要在某些 ThreadLocal
中注册您的输入流以记住它并在语句执行后清理它。未来的 jOOQ 版本将提供 SPI 来处理:https://github.com/jOOQ/jOOQ/issues/7591
由于 sqlite-jdbc
使用 setBinaryStream
抛出 SQLFeatureNotSupportedException
,还有一种方法如上面 github 问题中所建议的。
dsl.insertInto(TABLE1)
.columns(TABLE1.COL1, TABLE1.COL2)
.values("ABB", null)
.execute();
BufferedReader reader = new BufferedReader(new InputStreamReader(new FileInputStream(file), StandardCharsets.UTF_8));
StringBuilder lines = new StringBuilder();
int bytesCount = 0;
String line = null;
int value = 0;
Field<String> coalesce = DSL.coalesce(TABLE1.COL2, "");
char[] buffer = new char[100 * 1000 * 1000];
while ((value = reader.read(buffer)) != -1) {
lines.append(String.valueOf(buffer, 0, value));
bytesCount += lines.length();
if (bytesCount >= 100 * 1000 * 1000) {
dsl.update(TABLE1).set(TABLE1.COL2, DSL.concat(coalesce, lines.toString())).where(TABLE1.COL1.eq("ABB")).execute();
bytesCount = 0;
lines.setLength(0);
}
}
if (lines.length() > 0) {
dsl.update(TABLE1).set(TABLE1.COL2, DSL.concat(coalesce, lines.toString())).where(TABLE1.COL1.eq("ABB")).execute();
}
reader.close();