Google 从 BigQuery 到 Cloud Bigtable 的云数据流管道中的异常
Exceptions in Google Cloud Dataflow pipelines from BigQuery to Cloud Bigtable
执行数据流管道,每隔一段时间我们就会看到这些异常。我们能为他们做些什么吗?我们有一个非常简单的流程,它从 BigQuery 查询中读取数据并在 BigTable 中填充数据。
此外,管道内的数据会发生什么变化?是再加工的吗?还是在传输到 BigTable 的过程中丢失了?
CloudBigtableIO.initializeForWrite(p);
p.apply(BigQueryIO.Read.fromQuery(getQuery()))
.apply(ParDo.of(new DoFn<TableRow, Mutation>() {
public void processElement(ProcessContext c) {
Mutation output = convertDataToRow(c.element());
c.output(output);
}
}))
.apply(CloudBigtableIO.writeToTable(config));
private static Mutation convertDataToRow(TableRow element) {
LOG.info("element: "+ element);
LOG.info("BASM_segment_id: "+ element.get("BASM_segment_id"));
if(element.get("BASM_AID") != null){
Put obj = new Put(getRowKey(element).getBytes()).addColumn(SEGMENT_FAMILY, SEGMENT_COLUMN_NAME, ((String)element.get("BAS_category")).getBytes() );
obj.addColumn(USER_FAMILY, "AID".getBytes(), ((String)element.get("BASM_AID")).getBytes());
if(element.get("BASM_segment_id") != null){
obj.addColumn(SEGMENT_FAMILY, "segment_id".getBytes(), ((String)element.get("BASM_segment_id")).getBytes());
}
if(element.get("BAS_sub_category") != null){
obj.addColumn(SEGMENT_FAMILY, "sub_category".getBytes(), ((String)element.get("BAS_sub_category")).getBytes());
}
if(element.get("BAS_name") != null){
obj.addColumn(SEGMENT_FAMILY, "name".getBytes(), ((String)element.get("BAS_name")).getBytes());
}
if(element.get("BAS_description") != null){
obj.addColumn(SEGMENT_FAMILY, "description".getBytes(), ((String)element.get("BAS_description")).getBytes());
}
if(element.get("BAS_last_compute_day") != null){obj.addColumn(USER_FAMILY, "Krux_User_id".getBytes(), ((String)element.get("BASM_krux_user_id")).getBytes());
obj.addColumn(SEGMENT_FAMILY, "last_compute_day".getBytes(), ((String)element.get("BAS_last_compute_day")).getBytes());
}
if(element.get("BAS_type") != null){
obj.addColumn(SEGMENT_FAMILY, "type".getBytes(), ((String)element.get("BAS_type")).getBytes());
}
if(element.get("BASM_REGID") != null){
obj.addColumn(USER_FAMILY, "REGID".getBytes(), ((String)element.get("BASM_REGID")).getBytes() );
}
return obj;
}else{
return null;
}
}
以下是我们得到的异常:
2016-08-22T21:47:33.469Z: Error: (84707221e08b977b): java.lang.RuntimeException: com.google.cloud.dataflow.sdk.util.UserCodeExc
ption: org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException: Failed 1 action: StatusRuntimeException: 1 time,
at com.google.cloud.dataflow.sdk.runners.worker.SimpleParDoFn.output(SimpleParDoFn.java:162)
at com.google.cloud.dataflow.sdk.util.DoFnRunnerBase$DoFnContext.outputWindowedValue(DoFnRunnerBase.java:287)
at com.google.cloud.dataflow.sdk.util.DoFnRunnerBase$DoFnProcessContext.output(DoFnRunnerBase.java:449)
at com.nytimes.adtech.dataflow.pipelines.BigTableSegmentData.processElement(BigTableSegmentData.java:70)
Caused by: com.google.cloud.dataflow.sdk.util.UserCodeException: org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsExcept
on: Failed 1 action: StatusRuntimeException: 1 time,
at com.google.cloud.dataflow.sdk.util.UserCodeException.wrap(UserCodeException.java:35)
at com.google.cloud.dataflow.sdk.util.UserCodeException.wrapIf(UserCodeException.java:40)
at com.google.cloud.dataflow.sdk.util.DoFnRunnerBase.wrapUserCodeException(DoFnRunnerBase.java:368)
at com.google.cloud.dataflow.sdk.util.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:51)
at com.google.cloud.dataflow.sdk.util.DoFnRunnerBase.processElement(DoFnRunnerBase.java:138)
at com.google.cloud.dataflow.sdk.runners.worker.SimpleParDoFn.processElement(SimpleParDoFn.java:190)
at com.google.cloud.dataflow.sdk.runners.worker.ForwardingParDoFn.processElement(ForwardingParDoFn.java:42)
at com.google.cloud.dataflow.sdk.runners.worker.DataflowWorkerLoggingParDoFn.processElement(DataflowWorkerLoggingParDoFn.
ava:47)
at com.google.cloud.dataflow.sdk.util.common.worker.ParDoOperation.process(ParDoOperation.java:53)
at com.google.cloud.dataflow.sdk.util.common.worker.OutputReceiver.process(OutputReceiver.java:52)
at com.google.cloud.dataflow.sdk.runners.worker.SimpleParDoFn.output(SimpleParDoFn.java:160)
at com.google.cloud.dataflow.sdk.util.DoFnRunnerBase$DoFnContext.outputWindowedValue(DoFnRunnerBase.java:287)
at com.google.cloud.dataflow.sdk.util.DoFnRunnerBase$DoFnProcessContext.output(DoFnRunnerBase.java:449)
at com.nytimes.adtech.dataflow.pipelines.BigTableSegmentData.processElement(BigTableSegmentData.java:70)
at com.google.cloud.dataflow.sdk.util.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:49)
at com.google.cloud.dataflow.sdk.util.DoFnRunnerBase.processElement(DoFnRunnerBase.java:138)
at com.google.cloud.dataflow.sdk.runners.worker.SimpleParDoFn.processElement(SimpleParDoFn.java:190)
at com.google.cloud.dataflow.sdk.runners.worker.ForwardingParDoFn.processElement(ForwardingParDoFn.java:42)
at com.google.cloud.dataflow.sdk.runners.worker.DataflowWorkerLoggingParDoFn.processElement(DataflowWorkerLoggingParDoFn.
ava:47)
at com.google.cloud.dataflow.sdk.util.common.worker.ParDoOperation.process(ParDoOperation.java:53)
at com.google.cloud.dataflow.sdk.util.common.worker.OutputReceiver.process(OutputReceiver.java:52)
at com.google.cloud.dataflow.sdk.util.common.worker.ReadOperation.runReadLoop(ReadOperation.java:226)
at com.google.cloud.dataflow.sdk.util.common.worker.ReadOperation.start(ReadOperation.java:167)
at com.google.cloud.dataflow.sdk.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:71)
at com.google.cloud.dataflow.sdk.runners.worker.DataflowWorker.executeWork(DataflowWorker.java:288)
at com.google.cloud.dataflow.sdk.runners.worker.DataflowWorker.doWork(DataflowWorker.java:221)
at com.google.cloud.dataflow.sdk.runners.worker.DataflowWorker.getAndPerformWork(DataflowWorker.java:173)
at com.google.cloud.dataflow.sdk.runners.worker.DataflowWorkerHarness$WorkerThread.doWork(DataflowWorkerHarness.java:193)
at com.google.cloud.dataflow.sdk.runners.worker.DataflowWorkerHarness$WorkerThread.call(DataflowWorkerHarness.java:173)
at com.google.cloud.dataflow.sdk.runners.worker.DataflowWorkerHarness$WorkerThread.call(DataflowWorkerHarness.java:160)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
原因:
org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException: Failed 1 action: StatusRuntimeException: 1 time,
at com.google.cloud.bigtable.hbase.BigtableBufferedMutator.handleExceptions(BigtableBufferedMutator.java:389)
at com.google.cloud.bigtable.hbase.BigtableBufferedMutator.mutate(BigtableBufferedMutator.java:274)
at com.google.cloud.bigtable.dataflow.CloudBigtableIO$CloudBigtableSingleTableBufferedWriteFn.processElement(CloudBigtabl
IO.java:966)
从 Dataflow 控制台复制的异常
(7e75740160102c05): java.lang.RuntimeException: com.google.cloud.dataflow.sdk.util.UserCodeException: org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException: Failed 1 action: StatusRuntimeException: 1 time, at com.google.cloud.dataflow.sdk.runners.worker.SimpleParDoFn.output(SimpleParDoFn.java:162) at com.google.cloud.dataflow.sdk.util.DoFnRunnerBase$DoFnContext.outputWindowedValue(DoFnRunnerBase.java:287) at com.google.cloud.dataflow.sdk.util.DoFnRunnerBase$DoFnProcessContext.output(DoFnRunnerBase.java:449) at com.nytimes.adtech.dataflow.pipelines.BigTableSegmentData.processElement(BigTableSegmentData.java:70) Caused by: com.google.cloud.dataflow.sdk.util.UserCodeException: org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException: Failed 1 action: StatusRuntimeException: 1 time, at com.google.cloud.dataflow.sdk.util.UserCodeException.wrap(UserCodeException.java:35) at com.google.cloud.dataflow.sdk.util.UserCodeException.wrapIf(UserCodeException.java:40) at com.google.cloud.dataflow.sdk.util.DoFnRunnerBase.wrapUserCodeException(DoFnRunnerBase.java:368) at com.google.cloud.dataflow.sdk.util.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:51) at com.google.cloud.dataflow.sdk.util.DoFnRunnerBase.processElement(DoFnRunnerBase.java:138) at com.google.cloud.dataflow.sdk.runners.worker.SimpleParDoFn.processElement(SimpleParDoFn.java:190) at com.google.cloud.dataflow.sdk.runners.worker.ForwardingParDoFn.processElement(ForwardingParDoFn.java:42) at com.google.cloud.dataflow.sdk.runners.worker.DataflowWorkerLoggingParDoFn.processElement(DataflowWorkerLoggingParDoFn.java:47) at com.google.cloud.dataflow.sdk.util.common.worker.ParDoOperation.process(ParDoOperation.java:53) at com.google.cloud.dataflow.sdk.util.common.worker.OutputReceiver.process(OutputReceiver.java:52) at com.google.cloud.dataflow.sdk.runners.worker.SimpleParDoFn.output(SimpleParDoFn.java:160) at com.google.cloud.dataflow.sdk.util.DoFnRunnerBase$DoFnContext.outputWindowedValue(DoFnRunnerBase.java:287) at com.google.cloud.dataflow.sdk.util.DoFnRunnerBase$DoFnProcessContext.output(DoFnRunnerBase.java:449) at com.nytimes.adtech.dataflow.pipelines.BigTableSegmentData.processElement(BigTableSegmentData.java:70) at com.google.cloud.dataflow.sdk.util.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:49) at com.google.cloud.dataflow.sdk.util.DoFnRunnerBase.processElement(DoFnRunnerBase.java:138) at com.google.cloud.dataflow.sdk.runners.worker.SimpleParDoFn.processElement(SimpleParDoFn.java:190) at com.google.cloud.dataflow.sdk.runners.worker.ForwardingParDoFn.processElement(ForwardingParDoFn.java:42) at com.google.cloud.dataflow.sdk.runners.worker.DataflowWorkerLoggingParDoFn.processElement(DataflowWorkerLoggingParDoFn.java:47) at com.google.cloud.dataflow.sdk.util.common.worker.ParDoOperation.process(ParDoOperation.java:53) at com.google.cloud.dataflow.sdk.util.common.worker.OutputReceiver.process(OutputReceiver.java:52) at com.google.cloud.dataflow.sdk.util.common.worker.ReadOperation.runReadLoop(ReadOperation.java:226) at com.google.cloud.dataflow.sdk.util.common.worker.ReadOperation.start(ReadOperation.java:167) at com.google.cloud.dataflow.sdk.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:71) at com.google.cloud.dataflow.sdk.runners.worker.DataflowWorker.executeWork(DataflowWorker.java:288) at com.google.cloud.dataflow.sdk.runners.worker.DataflowWorker.doWork(DataflowWorker.java:221) at com.google.cloud.dataflow.sdk.runners.worker.DataflowWorker.getAndPerformWork(DataflowWorker.java:173) at com.google.cloud.dataflow.sdk.runners.worker.DataflowWorkerHarness$WorkerThread.doWork(DataflowWorkerHarness.java:193) at com.google.cloud.dataflow.sdk.runners.worker.DataflowWorkerHarness$WorkerThread.call(DataflowWorkerHarness.java:173) at com.google.cloud.dataflow.sdk.runners.worker.DataflowWorkerHarness$WorkerThread.call(DataflowWorkerHarness.java:160) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) Caused by: org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException: Failed 1 action: StatusRuntimeException: 1 time, at com.google.cloud.bigtable.hbase.BigtableBufferedMutator.handleExceptions(BigtableBufferedMutator.java:389) at com.google.cloud.bigtable.hbase.BigtableBufferedMutator.mutate(BigtableBufferedMutator.java:274) at com.google.cloud.bigtable.dataflow.CloudBigtableIO$CloudBigtableSingleTableBufferedWriteFn.processElement(CloudBigtableIO.java:966)
2016-08-23 (13:17:54) java.lang.RuntimeException: com.google.cloud.dataflow.sdk.util.UserCodeException: org.apache.hadoop...
2016-08-23 (13:17:54) java.lang.RuntimeException: com.google.cloud.dataflow.sdk.util.UserCodeException: org.apache.hadoop...
2016-08-23 (13:17:54) java.lang.RuntimeException: com.google.cloud.dataflow.sdk.util.UserCodeException: org.apache.hadoop...
2016-08-23 (13:17:54) java.lang.RuntimeException: com.google.cloud.dataflow.sdk.util.UserCodeException: org.apache.hadoop...
2016-08-23 (13:17:54) java.lang.RuntimeException: com.google.cloud.dataflow.sdk.util.UserCodeException: org.apache.hadoop...
2016-08-23 (13:17:54) java.lang.RuntimeException: com.google.cloud.dataflow.sdk.util.UserCodeException: org.apache.hadoop...
2016-08-23 (13:17:54) java.lang.RuntimeException: com.google.cloud.dataflow.sdk.util.UserCodeException: org.apache.hadoop...
2016-08-23 (13:17:54) java.lang.RuntimeException: com.google.cloud.dataflow.sdk.util.UserCodeException: org.apache.hadoop...
2016-08-23 (13:17:54) java.lang.RuntimeException: com.google.cloud.dataflow.sdk.util.UserCodeException: org.apache.hadoop...
2016-08-23 (13:17:54) java.lang.RuntimeException: com.google.cloud.dataflow.sdk.util.UserCodeException: org.apache.hadoop...
2016-08-23 (13:17:54) java.lang.RuntimeException: com.google.cloud.dataflow.sdk.util.UserCodeException: org.apache.hadoop...
2016-08-23 (13:17:54) java.lang.RuntimeException: com.google.cloud.dataflow.sdk.util.UserCodeException: org.apache.hadoop...
2016-08-23 (13:17:54) java.lang.RuntimeException: com.google.cloud.dataflow.sdk.util.UserCodeException: org.apache.hadoop...
2016-08-23 (13:17:54) java.lang.RuntimeException: com.google.cloud.dataflow.sdk.util.UserCodeException: org.apache.hadoop...
2016-08-23 (13:17:54) java.lang.RuntimeException: com.google.cloud.dataflow.sdk.util.UserCodeException: org.apache.hadoop...
2016-08-23 (13:17:54) java.lang.RuntimeException: com.google.cloud.dataflow.sdk.util.UserCodeException: org.apache.hadoop...
2016-08-23 (13:17:54) java.lang.RuntimeException: com.google.cloud.dataflow.sdk.util.UserCodeException: org.apache.hadoop...
2016-08-23 (13:17:54) java.lang.RuntimeException: com.google.cloud.dataflow.sdk.util.UserCodeException: org.apache.hadoop...
2016-08-23 (13:17:54) java.lang.RuntimeException: com.google.cloud.dataflow.sdk.util.UserCodeException: org.apache.hadoop...
2016-08-23 (13:17:54) java.lang.RuntimeException: com.google.cloud.dataflow.sdk.util.UserCodeException: org.apache.hadoop...
2016-08-23 (13:17:54) java.lang.RuntimeException: com.google.cloud.dataflow.sdk.util.UserCodeException: org.apache.hadoop...
2016-08-23 (13:17:54) java.lang.RuntimeException: com.google.cloud.dataflow.sdk.util.UserCodeException: org.apache.hadoop...
2016-08-23 (13:17:54) java.lang.RuntimeException: com.google.cloud.dataflow.sdk.util.UserCodeException: org.apache.hadoop...
2016-08-23 (13:17:54) java.lang.RuntimeException: com.google.cloud.dataflow.sdk.util.UserCodeException: org.apache.hadoop...
2016-08-23 (13:17:54) java.lang.RuntimeException: com.google.cloud.dataflow.sdk.util.UserCodeException: org.apache.hadoop...
2016-08-23 (13:17:54) java.lang.RuntimeException:
提前致谢
我们线下谈过。这里的问题是,与集群中的 Cloud Bigtable 节点数相比,您有太多的 Dataflow worker。您需要通过 reducing Dataflow workers 或联系我们的团队来增加您的 Cloud Bigtable 资源来更改该比率。
相对于您拥有的 Cloud Bigtable 节点数量,Bigtable 的表现令人钦佩,但来自 Dataflow 的负载太高而无法可靠地处理。
您可以在"CPU Usage" graph in the Google Cloud console. Anything over 80% of your capacity is likely to cause problems. If you get more Bigtable Quota, you can increase the number of nodes you have before you run the Dataflow job, and reduce it after the job is done. For example, Scio does that中查看您的使用情况。
==
关于"Also what happens to data inside the pipeline? Is it reprocessed? Or is it lost in transit to BigTable?":
Dataflow 尝试再次将数据发送到 Bigtable。在这些情况下,Dataflow 的重试机制将纠正临时问题。
不幸的是,当问题证明是 Cloud Bigtable 过载时,重试通过向 Bigtable 发送更多流量使问题更加复杂,从而加剧了问题。
执行数据流管道,每隔一段时间我们就会看到这些异常。我们能为他们做些什么吗?我们有一个非常简单的流程,它从 BigQuery 查询中读取数据并在 BigTable 中填充数据。
此外,管道内的数据会发生什么变化?是再加工的吗?还是在传输到 BigTable 的过程中丢失了?
CloudBigtableIO.initializeForWrite(p);
p.apply(BigQueryIO.Read.fromQuery(getQuery()))
.apply(ParDo.of(new DoFn<TableRow, Mutation>() {
public void processElement(ProcessContext c) {
Mutation output = convertDataToRow(c.element());
c.output(output);
}
}))
.apply(CloudBigtableIO.writeToTable(config));
private static Mutation convertDataToRow(TableRow element) {
LOG.info("element: "+ element);
LOG.info("BASM_segment_id: "+ element.get("BASM_segment_id"));
if(element.get("BASM_AID") != null){
Put obj = new Put(getRowKey(element).getBytes()).addColumn(SEGMENT_FAMILY, SEGMENT_COLUMN_NAME, ((String)element.get("BAS_category")).getBytes() );
obj.addColumn(USER_FAMILY, "AID".getBytes(), ((String)element.get("BASM_AID")).getBytes());
if(element.get("BASM_segment_id") != null){
obj.addColumn(SEGMENT_FAMILY, "segment_id".getBytes(), ((String)element.get("BASM_segment_id")).getBytes());
}
if(element.get("BAS_sub_category") != null){
obj.addColumn(SEGMENT_FAMILY, "sub_category".getBytes(), ((String)element.get("BAS_sub_category")).getBytes());
}
if(element.get("BAS_name") != null){
obj.addColumn(SEGMENT_FAMILY, "name".getBytes(), ((String)element.get("BAS_name")).getBytes());
}
if(element.get("BAS_description") != null){
obj.addColumn(SEGMENT_FAMILY, "description".getBytes(), ((String)element.get("BAS_description")).getBytes());
}
if(element.get("BAS_last_compute_day") != null){obj.addColumn(USER_FAMILY, "Krux_User_id".getBytes(), ((String)element.get("BASM_krux_user_id")).getBytes());
obj.addColumn(SEGMENT_FAMILY, "last_compute_day".getBytes(), ((String)element.get("BAS_last_compute_day")).getBytes());
}
if(element.get("BAS_type") != null){
obj.addColumn(SEGMENT_FAMILY, "type".getBytes(), ((String)element.get("BAS_type")).getBytes());
}
if(element.get("BASM_REGID") != null){
obj.addColumn(USER_FAMILY, "REGID".getBytes(), ((String)element.get("BASM_REGID")).getBytes() );
}
return obj;
}else{
return null;
}
}
以下是我们得到的异常:
2016-08-22T21:47:33.469Z: Error: (84707221e08b977b): java.lang.RuntimeException: com.google.cloud.dataflow.sdk.util.UserCodeExc ption: org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException: Failed 1 action: StatusRuntimeException: 1 time, at com.google.cloud.dataflow.sdk.runners.worker.SimpleParDoFn.output(SimpleParDoFn.java:162) at com.google.cloud.dataflow.sdk.util.DoFnRunnerBase$DoFnContext.outputWindowedValue(DoFnRunnerBase.java:287) at com.google.cloud.dataflow.sdk.util.DoFnRunnerBase$DoFnProcessContext.output(DoFnRunnerBase.java:449) at com.nytimes.adtech.dataflow.pipelines.BigTableSegmentData.processElement(BigTableSegmentData.java:70) Caused by: com.google.cloud.dataflow.sdk.util.UserCodeException: org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsExcept on: Failed 1 action: StatusRuntimeException: 1 time, at com.google.cloud.dataflow.sdk.util.UserCodeException.wrap(UserCodeException.java:35) at com.google.cloud.dataflow.sdk.util.UserCodeException.wrapIf(UserCodeException.java:40) at com.google.cloud.dataflow.sdk.util.DoFnRunnerBase.wrapUserCodeException(DoFnRunnerBase.java:368) at com.google.cloud.dataflow.sdk.util.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:51) at com.google.cloud.dataflow.sdk.util.DoFnRunnerBase.processElement(DoFnRunnerBase.java:138) at com.google.cloud.dataflow.sdk.runners.worker.SimpleParDoFn.processElement(SimpleParDoFn.java:190) at com.google.cloud.dataflow.sdk.runners.worker.ForwardingParDoFn.processElement(ForwardingParDoFn.java:42) at com.google.cloud.dataflow.sdk.runners.worker.DataflowWorkerLoggingParDoFn.processElement(DataflowWorkerLoggingParDoFn. ava:47) at com.google.cloud.dataflow.sdk.util.common.worker.ParDoOperation.process(ParDoOperation.java:53) at com.google.cloud.dataflow.sdk.util.common.worker.OutputReceiver.process(OutputReceiver.java:52) at com.google.cloud.dataflow.sdk.runners.worker.SimpleParDoFn.output(SimpleParDoFn.java:160) at com.google.cloud.dataflow.sdk.util.DoFnRunnerBase$DoFnContext.outputWindowedValue(DoFnRunnerBase.java:287) at com.google.cloud.dataflow.sdk.util.DoFnRunnerBase$DoFnProcessContext.output(DoFnRunnerBase.java:449) at com.nytimes.adtech.dataflow.pipelines.BigTableSegmentData.processElement(BigTableSegmentData.java:70) at com.google.cloud.dataflow.sdk.util.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:49) at com.google.cloud.dataflow.sdk.util.DoFnRunnerBase.processElement(DoFnRunnerBase.java:138) at com.google.cloud.dataflow.sdk.runners.worker.SimpleParDoFn.processElement(SimpleParDoFn.java:190) at com.google.cloud.dataflow.sdk.runners.worker.ForwardingParDoFn.processElement(ForwardingParDoFn.java:42) at com.google.cloud.dataflow.sdk.runners.worker.DataflowWorkerLoggingParDoFn.processElement(DataflowWorkerLoggingParDoFn. ava:47) at com.google.cloud.dataflow.sdk.util.common.worker.ParDoOperation.process(ParDoOperation.java:53) at com.google.cloud.dataflow.sdk.util.common.worker.OutputReceiver.process(OutputReceiver.java:52) at com.google.cloud.dataflow.sdk.util.common.worker.ReadOperation.runReadLoop(ReadOperation.java:226) at com.google.cloud.dataflow.sdk.util.common.worker.ReadOperation.start(ReadOperation.java:167) at com.google.cloud.dataflow.sdk.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:71) at com.google.cloud.dataflow.sdk.runners.worker.DataflowWorker.executeWork(DataflowWorker.java:288) at com.google.cloud.dataflow.sdk.runners.worker.DataflowWorker.doWork(DataflowWorker.java:221) at com.google.cloud.dataflow.sdk.runners.worker.DataflowWorker.getAndPerformWork(DataflowWorker.java:173) at com.google.cloud.dataflow.sdk.runners.worker.DataflowWorkerHarness$WorkerThread.doWork(DataflowWorkerHarness.java:193) at com.google.cloud.dataflow.sdk.runners.worker.DataflowWorkerHarness$WorkerThread.call(DataflowWorkerHarness.java:173) at com.google.cloud.dataflow.sdk.runners.worker.DataflowWorkerHarness$WorkerThread.call(DataflowWorkerHarness.java:160) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745)
原因:
org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException: Failed 1 action: StatusRuntimeException: 1 time, at com.google.cloud.bigtable.hbase.BigtableBufferedMutator.handleExceptions(BigtableBufferedMutator.java:389) at com.google.cloud.bigtable.hbase.BigtableBufferedMutator.mutate(BigtableBufferedMutator.java:274) at com.google.cloud.bigtable.dataflow.CloudBigtableIO$CloudBigtableSingleTableBufferedWriteFn.processElement(CloudBigtabl IO.java:966)
从 Dataflow 控制台复制的异常
(7e75740160102c05): java.lang.RuntimeException: com.google.cloud.dataflow.sdk.util.UserCodeException: org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException: Failed 1 action: StatusRuntimeException: 1 time, at com.google.cloud.dataflow.sdk.runners.worker.SimpleParDoFn.output(SimpleParDoFn.java:162) at com.google.cloud.dataflow.sdk.util.DoFnRunnerBase$DoFnContext.outputWindowedValue(DoFnRunnerBase.java:287) at com.google.cloud.dataflow.sdk.util.DoFnRunnerBase$DoFnProcessContext.output(DoFnRunnerBase.java:449) at com.nytimes.adtech.dataflow.pipelines.BigTableSegmentData.processElement(BigTableSegmentData.java:70) Caused by: com.google.cloud.dataflow.sdk.util.UserCodeException: org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException: Failed 1 action: StatusRuntimeException: 1 time, at com.google.cloud.dataflow.sdk.util.UserCodeException.wrap(UserCodeException.java:35) at com.google.cloud.dataflow.sdk.util.UserCodeException.wrapIf(UserCodeException.java:40) at com.google.cloud.dataflow.sdk.util.DoFnRunnerBase.wrapUserCodeException(DoFnRunnerBase.java:368) at com.google.cloud.dataflow.sdk.util.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:51) at com.google.cloud.dataflow.sdk.util.DoFnRunnerBase.processElement(DoFnRunnerBase.java:138) at com.google.cloud.dataflow.sdk.runners.worker.SimpleParDoFn.processElement(SimpleParDoFn.java:190) at com.google.cloud.dataflow.sdk.runners.worker.ForwardingParDoFn.processElement(ForwardingParDoFn.java:42) at com.google.cloud.dataflow.sdk.runners.worker.DataflowWorkerLoggingParDoFn.processElement(DataflowWorkerLoggingParDoFn.java:47) at com.google.cloud.dataflow.sdk.util.common.worker.ParDoOperation.process(ParDoOperation.java:53) at com.google.cloud.dataflow.sdk.util.common.worker.OutputReceiver.process(OutputReceiver.java:52) at com.google.cloud.dataflow.sdk.runners.worker.SimpleParDoFn.output(SimpleParDoFn.java:160) at com.google.cloud.dataflow.sdk.util.DoFnRunnerBase$DoFnContext.outputWindowedValue(DoFnRunnerBase.java:287) at com.google.cloud.dataflow.sdk.util.DoFnRunnerBase$DoFnProcessContext.output(DoFnRunnerBase.java:449) at com.nytimes.adtech.dataflow.pipelines.BigTableSegmentData.processElement(BigTableSegmentData.java:70) at com.google.cloud.dataflow.sdk.util.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:49) at com.google.cloud.dataflow.sdk.util.DoFnRunnerBase.processElement(DoFnRunnerBase.java:138) at com.google.cloud.dataflow.sdk.runners.worker.SimpleParDoFn.processElement(SimpleParDoFn.java:190) at com.google.cloud.dataflow.sdk.runners.worker.ForwardingParDoFn.processElement(ForwardingParDoFn.java:42) at com.google.cloud.dataflow.sdk.runners.worker.DataflowWorkerLoggingParDoFn.processElement(DataflowWorkerLoggingParDoFn.java:47) at com.google.cloud.dataflow.sdk.util.common.worker.ParDoOperation.process(ParDoOperation.java:53) at com.google.cloud.dataflow.sdk.util.common.worker.OutputReceiver.process(OutputReceiver.java:52) at com.google.cloud.dataflow.sdk.util.common.worker.ReadOperation.runReadLoop(ReadOperation.java:226) at com.google.cloud.dataflow.sdk.util.common.worker.ReadOperation.start(ReadOperation.java:167) at com.google.cloud.dataflow.sdk.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:71) at com.google.cloud.dataflow.sdk.runners.worker.DataflowWorker.executeWork(DataflowWorker.java:288) at com.google.cloud.dataflow.sdk.runners.worker.DataflowWorker.doWork(DataflowWorker.java:221) at com.google.cloud.dataflow.sdk.runners.worker.DataflowWorker.getAndPerformWork(DataflowWorker.java:173) at com.google.cloud.dataflow.sdk.runners.worker.DataflowWorkerHarness$WorkerThread.doWork(DataflowWorkerHarness.java:193) at com.google.cloud.dataflow.sdk.runners.worker.DataflowWorkerHarness$WorkerThread.call(DataflowWorkerHarness.java:173) at com.google.cloud.dataflow.sdk.runners.worker.DataflowWorkerHarness$WorkerThread.call(DataflowWorkerHarness.java:160) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) Caused by: org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException: Failed 1 action: StatusRuntimeException: 1 time, at com.google.cloud.bigtable.hbase.BigtableBufferedMutator.handleExceptions(BigtableBufferedMutator.java:389) at com.google.cloud.bigtable.hbase.BigtableBufferedMutator.mutate(BigtableBufferedMutator.java:274) at com.google.cloud.bigtable.dataflow.CloudBigtableIO$CloudBigtableSingleTableBufferedWriteFn.processElement(CloudBigtableIO.java:966)
2016-08-23 (13:17:54) java.lang.RuntimeException: com.google.cloud.dataflow.sdk.util.UserCodeException: org.apache.hadoop... 2016-08-23 (13:17:54) java.lang.RuntimeException: com.google.cloud.dataflow.sdk.util.UserCodeException: org.apache.hadoop... 2016-08-23 (13:17:54) java.lang.RuntimeException: com.google.cloud.dataflow.sdk.util.UserCodeException: org.apache.hadoop... 2016-08-23 (13:17:54) java.lang.RuntimeException: com.google.cloud.dataflow.sdk.util.UserCodeException: org.apache.hadoop... 2016-08-23 (13:17:54) java.lang.RuntimeException: com.google.cloud.dataflow.sdk.util.UserCodeException: org.apache.hadoop... 2016-08-23 (13:17:54) java.lang.RuntimeException: com.google.cloud.dataflow.sdk.util.UserCodeException: org.apache.hadoop... 2016-08-23 (13:17:54) java.lang.RuntimeException: com.google.cloud.dataflow.sdk.util.UserCodeException: org.apache.hadoop... 2016-08-23 (13:17:54) java.lang.RuntimeException: com.google.cloud.dataflow.sdk.util.UserCodeException: org.apache.hadoop... 2016-08-23 (13:17:54) java.lang.RuntimeException: com.google.cloud.dataflow.sdk.util.UserCodeException: org.apache.hadoop... 2016-08-23 (13:17:54) java.lang.RuntimeException: com.google.cloud.dataflow.sdk.util.UserCodeException: org.apache.hadoop... 2016-08-23 (13:17:54) java.lang.RuntimeException: com.google.cloud.dataflow.sdk.util.UserCodeException: org.apache.hadoop... 2016-08-23 (13:17:54) java.lang.RuntimeException: com.google.cloud.dataflow.sdk.util.UserCodeException: org.apache.hadoop... 2016-08-23 (13:17:54) java.lang.RuntimeException: com.google.cloud.dataflow.sdk.util.UserCodeException: org.apache.hadoop... 2016-08-23 (13:17:54) java.lang.RuntimeException: com.google.cloud.dataflow.sdk.util.UserCodeException: org.apache.hadoop... 2016-08-23 (13:17:54) java.lang.RuntimeException: com.google.cloud.dataflow.sdk.util.UserCodeException: org.apache.hadoop... 2016-08-23 (13:17:54) java.lang.RuntimeException: com.google.cloud.dataflow.sdk.util.UserCodeException: org.apache.hadoop... 2016-08-23 (13:17:54) java.lang.RuntimeException: com.google.cloud.dataflow.sdk.util.UserCodeException: org.apache.hadoop... 2016-08-23 (13:17:54) java.lang.RuntimeException: com.google.cloud.dataflow.sdk.util.UserCodeException: org.apache.hadoop... 2016-08-23 (13:17:54) java.lang.RuntimeException: com.google.cloud.dataflow.sdk.util.UserCodeException: org.apache.hadoop... 2016-08-23 (13:17:54) java.lang.RuntimeException: com.google.cloud.dataflow.sdk.util.UserCodeException: org.apache.hadoop... 2016-08-23 (13:17:54) java.lang.RuntimeException: com.google.cloud.dataflow.sdk.util.UserCodeException: org.apache.hadoop... 2016-08-23 (13:17:54) java.lang.RuntimeException: com.google.cloud.dataflow.sdk.util.UserCodeException: org.apache.hadoop... 2016-08-23 (13:17:54) java.lang.RuntimeException: com.google.cloud.dataflow.sdk.util.UserCodeException: org.apache.hadoop... 2016-08-23 (13:17:54) java.lang.RuntimeException: com.google.cloud.dataflow.sdk.util.UserCodeException: org.apache.hadoop... 2016-08-23 (13:17:54) java.lang.RuntimeException: com.google.cloud.dataflow.sdk.util.UserCodeException: org.apache.hadoop... 2016-08-23 (13:17:54) java.lang.RuntimeException:
提前致谢
我们线下谈过。这里的问题是,与集群中的 Cloud Bigtable 节点数相比,您有太多的 Dataflow worker。您需要通过 reducing Dataflow workers 或联系我们的团队来增加您的 Cloud Bigtable 资源来更改该比率。
相对于您拥有的 Cloud Bigtable 节点数量,Bigtable 的表现令人钦佩,但来自 Dataflow 的负载太高而无法可靠地处理。
您可以在"CPU Usage" graph in the Google Cloud console. Anything over 80% of your capacity is likely to cause problems. If you get more Bigtable Quota, you can increase the number of nodes you have before you run the Dataflow job, and reduce it after the job is done. For example, Scio does that中查看您的使用情况。
==
关于"Also what happens to data inside the pipeline? Is it reprocessed? Or is it lost in transit to BigTable?":
Dataflow 尝试再次将数据发送到 Bigtable。在这些情况下,Dataflow 的重试机制将纠正临时问题。
不幸的是,当问题证明是 Cloud Bigtable 过载时,重试通过向 Bigtable 发送更多流量使问题更加复杂,从而加剧了问题。