使用 Java 写入数据库时 Apache Beam 管道中的异常处理
Exception Handling in Apache Beam pipelines when writing to database using Java
在管道末端将简单记录写入 Postgres 中的 table(可以是任何数据库)时,一些潜在记录违反唯一性约束并触发异常。据我所知,没有直接的方法可以优雅地处理这些问题——管道要么完全出错,要么根据运行程序的不同,进入无休止的死亡螺旋。
关于错误处理的 beam docs. The medium posts 似乎没有提到任何关于这种情况的错误处理似乎不适用于 returns PDone 的这种特定类型的 PTransform .
答案难以理解且缺乏示例。
在我的示例中,我正在从一个包含 2 行重复行的文件中读取并尝试将它们写入 table。
CREATE TABLE foo (
field CHARACTER VARYING(100) UNIQUE
);
foo.txt 包含:
a
a
管道看起来像这样:
Pipeline p = Pipeline.create();
p.apply(TextIO.read().from("/path/to/foo.txt"))
.apply(
JdbcIO.<String>write()
.withDataSourceConfiguration(JdbcIO.DataSourceConfiguration.create("org.postgresql.Driver", "jdbc:postgresql://localhost:5432/somedb"))
.withStatement("INSERT INTO foo (field) VALUES (?)")
.withPreparedStatementSetter(new JdbcIO.PreparedStatementSetter<String>() {
private static final long serialVersionUID = 1L;
public void setParameters(String element, PreparedStatement query) throws SQLException {
query.setString(1, element);
}
}))
;
p.run();
这是上面简单示例的输出:
[WARNING]
org.apache.beam.sdk.Pipeline$PipelineExecutionException: java.sql.BatchUpdateException: Batch entry 0 INSERT INTO foo (field) VALUES ('a') was aborted: ERROR: duplicate key value violates unique constraint "foo_field_key"
Detail: Key (field)=(a) already exists. Call getNextException to see other errors in the batch.
at org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish (DirectRunner.java:332)
at org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish (DirectRunner.java:302)
at org.apache.beam.runners.direct.DirectRunner.run (DirectRunner.java:197)
at org.apache.beam.runners.direct.DirectRunner.run (DirectRunner.java:64)
at org.apache.beam.sdk.Pipeline.run (Pipeline.java:313)
at org.apache.beam.sdk.Pipeline.run (Pipeline.java:299)
at com.thing.Main.main (Main.java:105)
at sun.reflect.NativeMethodAccessorImpl.invoke0 (Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke (NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke (DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke (Method.java:498)
at org.codehaus.mojo.exec.ExecJavaMojo.run (ExecJavaMojo.java:282)
at java.lang.Thread.run (Thread.java:748)
Caused by: java.sql.BatchUpdateException: Batch entry 0 INSERT INTO foo (field) VALUES ('a') was aborted: ERROR: duplicate key value violates unique constraint "foo_field_key"
Detail: Key (field)=(a) already exists. Call getNextException to see other errors in the batch.
at org.postgresql.jdbc.BatchResultHandler.handleError (BatchResultHandler.java:148)
at org.postgresql.core.ResultHandlerDelegate.handleError (ResultHandlerDelegate.java:50)
at org.postgresql.core.v3.QueryExecutorImpl.processResults (QueryExecutorImpl.java:2184)
at org.postgresql.core.v3.QueryExecutorImpl.execute (QueryExecutorImpl.java:481)
at org.postgresql.jdbc.PgStatement.executeBatch (PgStatement.java:840)
at org.postgresql.jdbc.PgPreparedStatement.executeBatch (PgPreparedStatement.java:1538)
at org.apache.commons.dbcp2.DelegatingStatement.executeBatch (DelegatingStatement.java:345)
at org.apache.commons.dbcp2.DelegatingStatement.executeBatch (DelegatingStatement.java:345)
at org.apache.commons.dbcp2.DelegatingStatement.executeBatch (DelegatingStatement.java:345)
at org.apache.commons.dbcp2.DelegatingStatement.executeBatch (DelegatingStatement.java:345)
at org.apache.beam.sdk.io.jdbc.JdbcIO$Write$WriteFn.executeBatch (JdbcIO.java:846)
at org.apache.beam.sdk.io.jdbc.JdbcIO$Write$WriteFn.finishBundle (JdbcIO.java:819)
Caused by: org.postgresql.util.PSQLException: ERROR: duplicate key value violates unique constraint "foo_field_key"
Detail: Key (field)=(a) already exists.
at org.postgresql.core.v3.QueryExecutorImpl.receiveErrorResponse (QueryExecutorImpl.java:2440)
at org.postgresql.core.v3.QueryExecutorImpl.processResults (QueryExecutorImpl.java:2183)
at org.postgresql.core.v3.QueryExecutorImpl.execute (QueryExecutorImpl.java:481)
at org.postgresql.jdbc.PgStatement.executeBatch (PgStatement.java:840)
at org.postgresql.jdbc.PgPreparedStatement.executeBatch (PgPreparedStatement.java:1538)
at org.apache.commons.dbcp2.DelegatingStatement.executeBatch (DelegatingStatement.java:345)
at org.apache.commons.dbcp2.DelegatingStatement.executeBatch (DelegatingStatement.java:345)
at org.apache.commons.dbcp2.DelegatingStatement.executeBatch (DelegatingStatement.java:345)
at org.apache.commons.dbcp2.DelegatingStatement.executeBatch (DelegatingStatement.java:345)
at org.apache.beam.sdk.io.jdbc.JdbcIO$Write$WriteFn.executeBatch (JdbcIO.java:846)
at org.apache.beam.sdk.io.jdbc.JdbcIO$Write$WriteFn.finishBundle (JdbcIO.java:819)
at org.apache.beam.sdk.io.jdbc.JdbcIO$Write$WriteFn$DoFnInvoker.invokeFinishBundle (Unknown Source)
at org.apache.beam.repackaged.beam_runners_direct_java.runners.core.SimpleDoFnRunner.finishBundle (SimpleDoFnRunner.java:285)
at org.apache.beam.repackaged.beam_runners_direct_java.runners.core.SimplePushbackSideInputDoFnRunner.finishBundle (SimplePushbackSideInputDoFnRunner.java:118)
at org.apache.beam.runners.direct.ParDoEvaluator.finishBundle (ParDoEvaluator.java:223)
at org.apache.beam.runners.direct.DoFnLifecycleManagerRemovingTransformEvaluator.finishBundle (DoFnLifecycleManagerRemovingTransformEvaluator.java:73)
at org.apache.beam.runners.direct.DirectTransformExecutor.finishBundle (DirectTransformExecutor.java:188)
at org.apache.beam.runners.direct.DirectTransformExecutor.run (DirectTransformExecutor.java:126)
at java.util.concurrent.Executors$RunnableAdapter.call (Executors.java:511)
at java.util.concurrent.FutureTask.run (FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker (ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run (ThreadPoolExecutor.java:624)
at java.lang.Thread.run (Thread.java:748)
我希望能够捕获该异常并将其转移到一些死信结构中。
在 Beam 中还没有通用的方法。不时会有关于将 IOs 修改为非 return PDone 的讨论,但据我所知,没有现成的东西。
目前我能想到几个解决方法,但都远非理想:
- 在驱动程序中处理管道失败时的重启;
- copy-paste JdbcIO,它的一部分,或者实现你自己的 Jdbc 带有自定义异常处理的 ParDo;
- 向JdbcIO 添加异常处理功能并将其贡献给Beam,我们将不胜感激;
我也遇到了同样的问题。因此,我创建了自定义 jdbcio write 并返回了 PCollectionTuple 而不是 PDone,我在其中对成功插入的记录和在 WriteFn 中执行批处理时抛出 sqlexception 的其他记录进行了分类。
下面是 link 以了解更多详情:
https://sachin4java.blogspot.com/2021/11/extract-error-records-while-inserting.html
在管道末端将简单记录写入 Postgres 中的 table(可以是任何数据库)时,一些潜在记录违反唯一性约束并触发异常。据我所知,没有直接的方法可以优雅地处理这些问题——管道要么完全出错,要么根据运行程序的不同,进入无休止的死亡螺旋。
关于错误处理的 beam docs. The medium posts 似乎没有提到任何关于这种情况的错误处理似乎不适用于 returns PDone 的这种特定类型的 PTransform .
在我的示例中,我正在从一个包含 2 行重复行的文件中读取并尝试将它们写入 table。
CREATE TABLE foo (
field CHARACTER VARYING(100) UNIQUE
);
foo.txt 包含:
a
a
管道看起来像这样:
Pipeline p = Pipeline.create();
p.apply(TextIO.read().from("/path/to/foo.txt"))
.apply(
JdbcIO.<String>write()
.withDataSourceConfiguration(JdbcIO.DataSourceConfiguration.create("org.postgresql.Driver", "jdbc:postgresql://localhost:5432/somedb"))
.withStatement("INSERT INTO foo (field) VALUES (?)")
.withPreparedStatementSetter(new JdbcIO.PreparedStatementSetter<String>() {
private static final long serialVersionUID = 1L;
public void setParameters(String element, PreparedStatement query) throws SQLException {
query.setString(1, element);
}
}))
;
p.run();
这是上面简单示例的输出:
[WARNING]
org.apache.beam.sdk.Pipeline$PipelineExecutionException: java.sql.BatchUpdateException: Batch entry 0 INSERT INTO foo (field) VALUES ('a') was aborted: ERROR: duplicate key value violates unique constraint "foo_field_key"
Detail: Key (field)=(a) already exists. Call getNextException to see other errors in the batch.
at org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish (DirectRunner.java:332)
at org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish (DirectRunner.java:302)
at org.apache.beam.runners.direct.DirectRunner.run (DirectRunner.java:197)
at org.apache.beam.runners.direct.DirectRunner.run (DirectRunner.java:64)
at org.apache.beam.sdk.Pipeline.run (Pipeline.java:313)
at org.apache.beam.sdk.Pipeline.run (Pipeline.java:299)
at com.thing.Main.main (Main.java:105)
at sun.reflect.NativeMethodAccessorImpl.invoke0 (Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke (NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke (DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke (Method.java:498)
at org.codehaus.mojo.exec.ExecJavaMojo.run (ExecJavaMojo.java:282)
at java.lang.Thread.run (Thread.java:748)
Caused by: java.sql.BatchUpdateException: Batch entry 0 INSERT INTO foo (field) VALUES ('a') was aborted: ERROR: duplicate key value violates unique constraint "foo_field_key"
Detail: Key (field)=(a) already exists. Call getNextException to see other errors in the batch.
at org.postgresql.jdbc.BatchResultHandler.handleError (BatchResultHandler.java:148)
at org.postgresql.core.ResultHandlerDelegate.handleError (ResultHandlerDelegate.java:50)
at org.postgresql.core.v3.QueryExecutorImpl.processResults (QueryExecutorImpl.java:2184)
at org.postgresql.core.v3.QueryExecutorImpl.execute (QueryExecutorImpl.java:481)
at org.postgresql.jdbc.PgStatement.executeBatch (PgStatement.java:840)
at org.postgresql.jdbc.PgPreparedStatement.executeBatch (PgPreparedStatement.java:1538)
at org.apache.commons.dbcp2.DelegatingStatement.executeBatch (DelegatingStatement.java:345)
at org.apache.commons.dbcp2.DelegatingStatement.executeBatch (DelegatingStatement.java:345)
at org.apache.commons.dbcp2.DelegatingStatement.executeBatch (DelegatingStatement.java:345)
at org.apache.commons.dbcp2.DelegatingStatement.executeBatch (DelegatingStatement.java:345)
at org.apache.beam.sdk.io.jdbc.JdbcIO$Write$WriteFn.executeBatch (JdbcIO.java:846)
at org.apache.beam.sdk.io.jdbc.JdbcIO$Write$WriteFn.finishBundle (JdbcIO.java:819)
Caused by: org.postgresql.util.PSQLException: ERROR: duplicate key value violates unique constraint "foo_field_key"
Detail: Key (field)=(a) already exists.
at org.postgresql.core.v3.QueryExecutorImpl.receiveErrorResponse (QueryExecutorImpl.java:2440)
at org.postgresql.core.v3.QueryExecutorImpl.processResults (QueryExecutorImpl.java:2183)
at org.postgresql.core.v3.QueryExecutorImpl.execute (QueryExecutorImpl.java:481)
at org.postgresql.jdbc.PgStatement.executeBatch (PgStatement.java:840)
at org.postgresql.jdbc.PgPreparedStatement.executeBatch (PgPreparedStatement.java:1538)
at org.apache.commons.dbcp2.DelegatingStatement.executeBatch (DelegatingStatement.java:345)
at org.apache.commons.dbcp2.DelegatingStatement.executeBatch (DelegatingStatement.java:345)
at org.apache.commons.dbcp2.DelegatingStatement.executeBatch (DelegatingStatement.java:345)
at org.apache.commons.dbcp2.DelegatingStatement.executeBatch (DelegatingStatement.java:345)
at org.apache.beam.sdk.io.jdbc.JdbcIO$Write$WriteFn.executeBatch (JdbcIO.java:846)
at org.apache.beam.sdk.io.jdbc.JdbcIO$Write$WriteFn.finishBundle (JdbcIO.java:819)
at org.apache.beam.sdk.io.jdbc.JdbcIO$Write$WriteFn$DoFnInvoker.invokeFinishBundle (Unknown Source)
at org.apache.beam.repackaged.beam_runners_direct_java.runners.core.SimpleDoFnRunner.finishBundle (SimpleDoFnRunner.java:285)
at org.apache.beam.repackaged.beam_runners_direct_java.runners.core.SimplePushbackSideInputDoFnRunner.finishBundle (SimplePushbackSideInputDoFnRunner.java:118)
at org.apache.beam.runners.direct.ParDoEvaluator.finishBundle (ParDoEvaluator.java:223)
at org.apache.beam.runners.direct.DoFnLifecycleManagerRemovingTransformEvaluator.finishBundle (DoFnLifecycleManagerRemovingTransformEvaluator.java:73)
at org.apache.beam.runners.direct.DirectTransformExecutor.finishBundle (DirectTransformExecutor.java:188)
at org.apache.beam.runners.direct.DirectTransformExecutor.run (DirectTransformExecutor.java:126)
at java.util.concurrent.Executors$RunnableAdapter.call (Executors.java:511)
at java.util.concurrent.FutureTask.run (FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker (ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run (ThreadPoolExecutor.java:624)
at java.lang.Thread.run (Thread.java:748)
我希望能够捕获该异常并将其转移到一些死信结构中。
在 Beam 中还没有通用的方法。不时会有关于将 IOs 修改为非 return PDone 的讨论,但据我所知,没有现成的东西。
目前我能想到几个解决方法,但都远非理想:
- 在驱动程序中处理管道失败时的重启;
- copy-paste JdbcIO,它的一部分,或者实现你自己的 Jdbc 带有自定义异常处理的 ParDo;
- 向JdbcIO 添加异常处理功能并将其贡献给Beam,我们将不胜感激;
我也遇到了同样的问题。因此,我创建了自定义 jdbcio write 并返回了 PCollectionTuple 而不是 PDone,我在其中对成功插入的记录和在 WriteFn 中执行批处理时抛出 sqlexception 的其他记录进行了分类。
下面是 link 以了解更多详情: https://sachin4java.blogspot.com/2021/11/extract-error-records-while-inserting.html