gpt4 book ai didi

apache-beam - 如何使用 Apache Beam (Java) 进行异步 Http 调用?

转载 作者:行者123 更新时间:2023-12-02 00:35:19 30 4
gpt4 key购买 nike

输入的PCollection是http requests,是一个有界数据集。我想在 ParDo 中进行异步 http 调用(Java),解析响应并将结果放入输出 PCollection 中。我的代码如下。获取异常如下。

我想不通是什么原因。需要一个向导....

java.util.concurrent.CompletionException: java.lang.IllegalStateException: Can't add element ValueInGlobalWindow{value=streaming.mapserver.backfill.EnrichedPoint@2c59e, pane=PaneInfo.NO_FIRING} to committed bundle in PCollection Call Map Server With Rate Throttle/ParMultiDo(ProcessRequests).output [PCollection]

代码:

public class ProcessRequestsFn extends DoFn<PreparedRequest,EnrichedPoint> {
private static AsyncHttpClient _HttpClientAsync;
private static ExecutorService _ExecutorService;

static{

AsyncHttpClientConfig cg = config()
.setKeepAlive(true)
.setDisableHttpsEndpointIdentificationAlgorithm(true)
.setUseInsecureTrustManager(true)
.addRequestFilter(new RateLimitedThrottleRequestFilter(100,1000))
.build();

_HttpClientAsync = asyncHttpClient(cg);

_ExecutorService = Executors.newCachedThreadPool();

}


@DoFn.ProcessElement
public void processElement(ProcessContext c) {

PreparedRequest request = c.element();

if(request == null)
return;

_HttpClientAsync.prepareGet((request.getRequest()))
.execute()
.toCompletableFuture()
.thenApply(response -> { if(response.getStatusCode() == HttpStatusCodes.STATUS_CODE_OK){
return response.getResponseBody();
} return null; } )
.thenApply(responseBody->
{
List<EnrichedPoint> resList = new ArrayList<>();
/*some process logic here*/
System.out.printf("%d enriched points back\n", result.length());
}
return resList;

})
.thenAccept(resList -> {
for (EnrichedPoint enrichedPoint : resList) {
c.output(enrichedPoint);
}
})
.exceptionally(ex->{
System.out.println(ex);
return null;
});

}
}

最佳答案

Scio库实现了处理异步操作的 DoFnBaseAsyncDoFn可能会为您提供所需的处理。由于您正在处理 CompletableFuture,因此还请查看 JavaAsyncDoFn .

请注意,您不一定需要使用 Scio 库,但您可以采用 BaseAsyncDoFn 的主要思想因为它独立于 Scio 库的其余部分。

关于apache-beam - 如何使用 Apache Beam (Java) 进行异步 Http 调用?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/49884949/

30 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com