Dataflow/Beam BigQuery 中的流式插入导致 SSL 错误

Dataflow/Beam streaming inserts in BigQuery causing SSL errors

我们将数据流式传输到 BigQuery(使用动态目标)的 Dataflow 管道在 作业日志 中显示了很多异常。所有这些都是 javax.net.ssl.SSLException: Connection reset 个例外。我在下面提供了堆栈跟踪,但想知道这是否会导致 数据丢失?

这些错误很多(每天数百个)并且似乎与 worker logs 一致,如下所示:

Execution of work for computation 'P6' on key '-REDACTED-' failed with uncaught 
exception. Work will be retried locally.

我可以得出结论工作在工作人员本地有效重试并且没有数据丢失,但异常仍然存在在作业日志中提到?

我发现通过查看不同的日志很难理解哪些错误实际上导致了数据丢失

示例堆栈跟踪:

Error message from worker: java.lang.RuntimeException: javax.net.ssl.SSLException: Connection reset
org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl$DatasetServiceImpl.insertAll(BigQueryServicesImpl.java:954) 
org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl$DatasetServiceImpl.insertAll(BigQueryServicesImpl.java:994) 
org.apache.beam.sdk.io.gcp.bigquery.BatchedStreamingWrite.flushRows(BatchedStreamingWrite.java:375) 
org.apache.beam.sdk.io.gcp.bigquery.BatchedStreamingWrite.access0(BatchedStreamingWrite.java:69) 
org.apache.beam.sdk.io.gcp.bigquery.BatchedStreamingWrite$BatchAndInsertElements.finishBundle(BatchedStreamingWrite.java:271) 

Caused by: javax.net.ssl.SSLException: Connection reset java.base/sun.security.ssl.Alert.createSSLException(Alert.java:127) 
java.base/sun.security.ssl.TransportContext.fatal(TransportContext.java:350) 
java.base/sun.security.ssl.TransportContext.fatal(TransportContext.java:293) 
java.base/sun.security.ssl.TransportContext.fatal(TransportContext.java:288) 
java.base/sun.security.ssl.SSLSocketImpl.handleException(SSLSocketImpl.java:1581) 
java.base/sun.security.ssl.SSLSocketImpl$AppInputStream.read(SSLSocketImpl.java:979) 
java.base/java.io.BufferedInputStream.fill(BufferedInputStream.java:252) 
java.base/java.io.BufferedInputStream.read1(BufferedInputStream.java:292) 
java.base/java.io.BufferedInputStream.read(BufferedInputStream.java:351) 
java.base/sun.net.www.http.ChunkedInputStream.readAheadBlocking(ChunkedInputStream.java:552) 
java.base/sun.net.www.http.ChunkedInputStream.readAhead(ChunkedInputStream.java:609) 
java.base/sun.net.www.http.ChunkedInputStream.read(ChunkedInputStream.java:696) 
java.base/java.io.FilterInputStream.read(FilterInputStream.java:133) 
java.base/sun.net.www.protocol.http.HttpURLConnection$HttpInputStream.read(HttpURLConnection.java:3510) 
com.google.api.client.http.javanet.NetHttpResponse$SizeValidatingInputStream.read(NetHttpResponse.java:164) 
java.base/java.io.FilterInputStream.read(FilterInputStream.java:133) 
java.base/java.io.FilterInputStream.read(FilterInputStream.java:107) 
com.google.common.io.ByteStreams.exhaust(ByteStreams.java:274) 
com.google.api.client.http.ConsumingInputStream.close(ConsumingInputStream.java:40) 
java.base/java.util.zip.InflaterInputStream.close(InflaterInputStream.java:232) 
java.base/java.util.zip.GZIPInputStream.close(GZIPInputStream.java:137) 
com.fasterxml.jackson.core.json.UTF8StreamJsonParser._closeInput(UTF8StreamJsonParser.java:252) 
com.fasterxml.jackson.core.base.ParserBase.close(ParserBase.java:369) 
com.google.api.client.json.jackson2.JacksonParser.close(JacksonParser.java:48) 
com.google.api.client.json.JsonParser.parse(JsonParser.java:363) 
com.google.api.client.json.JsonParser.parse(JsonParser.java:335) 
com.google.api.client.json.JsonObjectParser.parseAndClose(JsonObjectParser.java:79) 
com.google.api.client.json.JsonObjectParser.parseAndClose(JsonObjectParser.java:73) 
com.google.api.client.http.HttpResponse.parseAs(HttpResponse.java:456) 
com.google.api.client.googleapis.services.AbstractGoogleClientRequest.execute(AbstractGoogleClientRequest.java:591) 
org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl$DatasetServiceImpl.lambda$insertAll(BigQueryServicesImpl.java:878) 
org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl$BoundedExecutorService$SemaphoreCallable.call(BigQueryServicesImpl.java:1342) 
java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) 
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) 
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) 
java.base/java.lang.Thread.run(Thread.java:834) 

Caused by: java.net.SocketException: Connection reset java.base/java.net.SocketInputStream.read(SocketInputStream.java:186) 
java.base/java.net.SocketInputStream.read(SocketInputStream.java:140) 
java.base/sun.security.ssl.SSLSocketInputRecord.read(SSLSocketInputRecord.java:476) 
java.base/sun.security.ssl.SSLSocketInputRecord.readHeader(SSLSocketInputRecord.java:470) 
java.base/sun.security.ssl.SSLSocketInputRecord.bytesInCompletePacket(SSLSocketInputRecord.java:70) 
java.base/sun.security.ssl.SSLSocketImpl.readApplicationRecord(SSLSocketImpl.java:1354) 
java.base/sun.security.ssl.SSLSocketImpl$AppInputStream.read(SSLSocketImpl.java:963) 
java.base/java.io.BufferedInputStream.fill(BufferedInputStream.java:252) 
java.base/java.io.BufferedInputStream.read1(BufferedInputStream.java:292) 
java.base/java.io.BufferedInputStream.read(BufferedInputStream.java:351) 
java.base/sun.net.www.http.ChunkedInputStream.readAheadBlocking(ChunkedInputStream.java:552) 
java.base/sun.net.www.http.ChunkedInputStream.readAhead(ChunkedInputStream.java:609) 
java.base/sun.net.www.http.ChunkedInputStream.read(ChunkedInputStream.java:696) 
java.base/java.io.FilterInputStream.read(FilterInputStream.java:133) 
java.base/sun.net.www.protocol.http.HttpURLConnection$HttpInputStream.read(HttpURLConnection.java:3510) 
com.google.api.client.http.javanet.NetHttpResponse$SizeValidatingInputStream.read(NetHttpResponse.java:164) 
java.base/java.io.FilterInputStream.read(FilterInputStream.java:133) 
java.base/java.io.FilterInputStream.read(FilterInputStream.java:107) 
com.google.common.io.ByteStreams.exhaust(ByteStreams.java:274) 
com.google.api.client.http.ConsumingInputStream.close(ConsumingInputStream.java:40) 
java.base/java.util.zip.InflaterInputStream.close(InflaterInputStream.java:232) 
java.base/java.util.zip.GZIPInputStream.close(GZIPInputStream.java:137) 
com.fasterxml.jackson.core.json.UTF8StreamJsonParser._closeInput(UTF8StreamJsonParser.java:252) 
com.fasterxml.jackson.core.base.ParserBase.close(ParserBase.java:369) 
com.google.api.client.json.jackson2.JacksonParser.close(JacksonParser.java:48) 
com.google.api.client.json.JsonParser.parse(JsonParser.java:363) 
com.google.api.client.json.JsonParser.parse(JsonParser.java:335) 
com.google.api.client.json.JsonObjectParser.parseAndClose(JsonObjectParser.java:79) 
com.google.api.client.json.JsonObjectParser.parseAndClose(JsonObjectParser.java:73) 
com.google.api.client.http.HttpResponse.parseAs(HttpResponse.java:456) 
com.google.api.client.googleapis.services.AbstractGoogleClientRequest.execute(AbstractGoogleClientRequest.java:591) 
org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl$DatasetServiceImpl.lambda$insertAll(BigQueryServicesImpl.java:878) 
org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl$BoundedExecutorService$SemaphoreCallable.call(BigQueryServicesImpl.java:1342) 
java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) 
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) 
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) 
java.base/java.lang.Thread.run(Thread.java:834)

这些错误不应导致数据丢失,因为在抛出异常时会重试捆绑包。