如何使用 Apache Beam (Java) 进行异步 Http 调用?
How to do Async Http Call with Apache Beam (Java)?
输入PCollection是http请求,是一个有界数据集。我想在 ParDo 中进行异步 http 调用 (Java),解析响应并将结果放入输出 PCollection。我的代码如下。获取异常如下。
我没弄清楚原因。需要指导....
java.util.concurrent.CompletionException: java.lang.IllegalStateException: Can't add element ValueInGlobalWindow{value=streaming.mapserver.backfill.EnrichedPoint@2c59e, pane=PaneInfo.NO_FIRING} to committed bundle in PCollection Call Map Server With Rate Throttle/ParMultiDo(ProcessRequests).output [PCollection]
代码:
public class ProcessRequestsFn extends DoFn<PreparedRequest,EnrichedPoint> {
private static AsyncHttpClient _HttpClientAsync;
private static ExecutorService _ExecutorService;
static{
AsyncHttpClientConfig cg = config()
.setKeepAlive(true)
.setDisableHttpsEndpointIdentificationAlgorithm(true)
.setUseInsecureTrustManager(true)
.addRequestFilter(new RateLimitedThrottleRequestFilter(100,1000))
.build();
_HttpClientAsync = asyncHttpClient(cg);
_ExecutorService = Executors.newCachedThreadPool();
}
@DoFn.ProcessElement
public void processElement(ProcessContext c) {
PreparedRequest request = c.element();
if(request == null)
return;
_HttpClientAsync.prepareGet((request.getRequest()))
.execute()
.toCompletableFuture()
.thenApply(response -> { if(response.getStatusCode() == HttpStatusCodes.STATUS_CODE_OK){
return response.getResponseBody();
} return null; } )
.thenApply(responseBody->
{
List<EnrichedPoint> resList = new ArrayList<>();
/*some process logic here*/
System.out.printf("%d enriched points back\n", result.length());
}
return resList;
})
.thenAccept(resList -> {
for (EnrichedPoint enrichedPoint : resList) {
c.output(enrichedPoint);
}
})
.exceptionally(ex->{
System.out.println(ex);
return null;
});
}
}
Scio library implements a DoFn
which deals with asynchronous operations. The BaseAsyncDoFn
might provide you the handling you need. Since you're dealing with CompletableFuture
also take a look at the JavaAsyncDoFn
.
请注意,您不一定需要使用 Scio 库,但您可以采用 BaseAsyncDoFn
的主要思想,因为它独立于 Scio 库的其余部分。
您遇到的问题是您在 processElement
或 finishBundle
调用的上下文之外输出。
您需要将所有输出收集到内存中,并在以后的 processElement
调用期间急切地输出它们,并在 finishBundle
结束时通过阻塞直到所有调用完成。
输入PCollection是http请求,是一个有界数据集。我想在 ParDo 中进行异步 http 调用 (Java),解析响应并将结果放入输出 PCollection。我的代码如下。获取异常如下。
我没弄清楚原因。需要指导....
java.util.concurrent.CompletionException: java.lang.IllegalStateException: Can't add element ValueInGlobalWindow{value=streaming.mapserver.backfill.EnrichedPoint@2c59e, pane=PaneInfo.NO_FIRING} to committed bundle in PCollection Call Map Server With Rate Throttle/ParMultiDo(ProcessRequests).output [PCollection]
代码:
public class ProcessRequestsFn extends DoFn<PreparedRequest,EnrichedPoint> {
private static AsyncHttpClient _HttpClientAsync;
private static ExecutorService _ExecutorService;
static{
AsyncHttpClientConfig cg = config()
.setKeepAlive(true)
.setDisableHttpsEndpointIdentificationAlgorithm(true)
.setUseInsecureTrustManager(true)
.addRequestFilter(new RateLimitedThrottleRequestFilter(100,1000))
.build();
_HttpClientAsync = asyncHttpClient(cg);
_ExecutorService = Executors.newCachedThreadPool();
}
@DoFn.ProcessElement
public void processElement(ProcessContext c) {
PreparedRequest request = c.element();
if(request == null)
return;
_HttpClientAsync.prepareGet((request.getRequest()))
.execute()
.toCompletableFuture()
.thenApply(response -> { if(response.getStatusCode() == HttpStatusCodes.STATUS_CODE_OK){
return response.getResponseBody();
} return null; } )
.thenApply(responseBody->
{
List<EnrichedPoint> resList = new ArrayList<>();
/*some process logic here*/
System.out.printf("%d enriched points back\n", result.length());
}
return resList;
})
.thenAccept(resList -> {
for (EnrichedPoint enrichedPoint : resList) {
c.output(enrichedPoint);
}
})
.exceptionally(ex->{
System.out.println(ex);
return null;
});
}
}
Scio library implements a DoFn
which deals with asynchronous operations. The BaseAsyncDoFn
might provide you the handling you need. Since you're dealing with CompletableFuture
also take a look at the JavaAsyncDoFn
.
请注意,您不一定需要使用 Scio 库,但您可以采用 BaseAsyncDoFn
的主要思想,因为它独立于 Scio 库的其余部分。
您遇到的问题是您在 processElement
或 finishBundle
调用的上下文之外输出。
您需要将所有输出收集到内存中,并在以后的 processElement
调用期间急切地输出它们,并在 finishBundle
结束时通过阻塞直到所有调用完成。