gpt4 book ai didi

java - AWS SDK v2 SdkAsyncHttpClient 实现 使用 Java 11 java.net.http HttpClient sendAsync

转载 作者:行者123 更新时间:2023-12-05 06:23:24 24 4
gpt4 key购买 nike

我正在尝试实现 SdkAsyncHttpClient使用 Java 11 的 java.net.http.HttpClient (特别是 sendAsync )。 SdkAsyncHttpClient 有一种实现方法 CompletableFuture<Void> execute(AsyncExecuteRequest asyncExecuteRequest) . AsyncExecuteRequest提供了一种获取有关 HTTP 请求的详细信息的方法,最重要的是,一个 SdkHttpContentPublisher .这进入了响应式(Reactive)发布者/订阅模型的范例 - HttpClient.sendAsync似乎有内置的支持。我似乎接近实现,但(至少)缺少一个关键步骤,因为我似乎无法完成返回的 future 。

我认为我可能遗漏了一些基本的东西,无法以直接的方式将两者联系在一起,但到目前为止我还没有想到。

这是我对一个天真的(而且非常简单)实现的尝试:

import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import software.amazon.awssdk.http.Protocol;
import software.amazon.awssdk.http.SdkHttpConfigurationOption;
import software.amazon.awssdk.http.SdkHttpRequest;
import software.amazon.awssdk.http.async.AsyncExecuteRequest;
import software.amazon.awssdk.http.async.SdkAsyncHttpClient;
import software.amazon.awssdk.http.async.SdkHttpContentPublisher;
import software.amazon.awssdk.utils.AttributeMap;

import java.net.http.HttpClient;
import java.net.http.HttpRequest;
import java.net.http.HttpResponse;
import java.nio.ByteBuffer;
import java.time.Duration;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Flow;

import static java.net.http.HttpClient.Version.HTTP_1_1;
import static java.net.http.HttpClient.Version.HTTP_2;
import static software.amazon.awssdk.http.Protocol.HTTP2;
import static software.amazon.awssdk.http.SdkHttpConfigurationOption.CONNECTION_TIMEOUT;
import static software.amazon.awssdk.http.SdkHttpConfigurationOption.PROTOCOL;
import static software.amazon.awssdk.http.SdkHttpConfigurationOption.READ_TIMEOUT;

public class JavaAsyncHttpClient implements SdkAsyncHttpClient {
private final HttpClient httpClient;

public JavaAsyncHttpClient(AttributeMap options) {
this.httpClient = HttpClient.newBuilder()
.connectTimeout(options.get(CONNECTION_TIMEOUT))
.version(options.get(PROTOCOL) == HTTP2 ? HTTP_2 : HTTP_1_1)
.build();
}

@Override
public CompletableFuture<Void> execute(AsyncExecuteRequest asyncExecuteRequest) {
SdkHttpRequest request = asyncExecuteRequest.request();
HttpRequest.Builder requestBuilder = HttpRequest.newBuilder().uri(request.getUri());
for (Map.Entry<String, List<String>> header : request.headers().entrySet()) {
// avoid java.lang.IllegalArgumentException: restricted header name: "Content-Length"
if (!header.getKey().equalsIgnoreCase("Content-Length") && !header.getKey().equalsIgnoreCase("Host")) {
for (String headerVal : header.getValue()) {
requestBuilder = requestBuilder.header(header.getKey(), headerVal);
}
}
}

switch (request.method()) {
case POST:
requestBuilder = requestBuilder.POST(HttpRequest.BodyPublishers.fromPublisher(
toFlowPublisher(asyncExecuteRequest.requestContentPublisher())));
break;
case PUT:
requestBuilder = requestBuilder.PUT(HttpRequest.BodyPublishers.fromPublisher(
toFlowPublisher(asyncExecuteRequest.requestContentPublisher())));
break;
case DELETE:
requestBuilder = requestBuilder.DELETE();
break;
case HEAD:
requestBuilder = requestBuilder.method("HEAD", HttpRequest.BodyPublishers.noBody());
break;
case PATCH:
throw new UnsupportedOperationException("PATCH not supported");
case OPTIONS:
requestBuilder = requestBuilder.method("OPTIONS", HttpRequest.BodyPublishers.noBody());
break;
}
// Need to use BodyHandlers.ofPublisher() or is that a dead end? How can link up the AWS Publisher/Subscribers
Subscriber<ByteBuffer> subscriber = new BaosSubscriber(new CompletableFuture<>());
asyncExecuteRequest.requestContentPublisher().subscribe(subscriber);
HttpRequest httpRequest = requestBuilder.build();
return httpClient.sendAsync(httpRequest, HttpResponse.BodyHandlers.fromSubscriber(toFlowSubscriber(subscriber)))
.thenApply(voidHttpResponse -> null);
}

private Flow.Subscriber<? super List<ByteBuffer>> toFlowSubscriber(Subscriber<ByteBuffer> subscriber) {
return new Flow.Subscriber<>() {
@Override
public void onSubscribe(Flow.Subscription subscription) {
subscriber.onSubscribe(toAwsSubscription(subscription));
}

@Override
public void onNext(List<ByteBuffer> item) {
subscriber.onNext(item.get(0));
}

@Override
public void onError(Throwable throwable) {
subscriber.onError(throwable);
}

@Override
public void onComplete() {
subscriber.onComplete();
}
};
}

private Subscription toAwsSubscription(Flow.Subscription subscription) {
return new Subscription() {
@Override
public void request(long n) {
subscription.request(n);
}

@Override
public void cancel() {
subscription.cancel();
}
};
}

private Flow.Publisher<ByteBuffer> toFlowPublisher(SdkHttpContentPublisher requestContentPublisher) {
return subscriber -> requestContentPublisher.subscribe(toAwsSubscriber(subscriber));
}

private Subscriber<? super ByteBuffer> toAwsSubscriber(Flow.Subscriber<? super ByteBuffer> subscriber) {
return new Subscriber<>() {
@Override
public void onSubscribe(Subscription s) {
subscriber.onSubscribe(toFlowSubscription(s));
}

@Override
public void onNext(ByteBuffer byteBuffer) {
subscriber.onNext(byteBuffer);
}

@Override
public void onError(Throwable t) {
subscriber.onError(t);
}

@Override
public void onComplete() {
subscriber.onComplete();
}
};
}

private Flow.Subscription toFlowSubscription(Subscription subscription) {
return new Flow.Subscription() {
@Override
public void request(long n) {
subscription.request(n);
}

@Override
public void cancel() {
subscription.cancel();
}
};
}

@Override
public void close() {}


private static class BaosSubscriber implements Subscriber<ByteBuffer> {
private final ByteArrayOutputStream baos = new ByteArrayOutputStream();
private final CompletableFuture<ByteArrayOutputStream> streamFuture;
private Subscription subscription;

private BaosSubscriber(CompletableFuture<ByteArrayOutputStream> streamFuture) {
this.streamFuture = streamFuture;
}

@Override
public void onSubscribe(Subscription subscription) {
this.subscription = subscription;
subscription.request(Long.MAX_VALUE);
}

@Override
public void onNext(ByteBuffer byteBuffer) {
try {
baos.write(BinaryUtils.copyBytesFrom(byteBuffer));
this.subscription.request(Long.MAX_VALUE);
} catch (IOException e) {
// Should never happen
streamFuture.completeExceptionally(e);
}
}

@Override
public void onError(Throwable t) {
streamFuture.completeExceptionally(t);

}

@Override
public void onComplete() {
streamFuture.complete(baos);
}
}

我在这里错过了什么?返回以 null 完成的 future 遵循 SdkAsyncHttpClient 的规范很明显,HTTP 响应需要以某种方式发送给 AWS 端的订阅者——但这正是我迷路的地方。

编辑:刚刚通过谷歌搜索发现:https://github.com/rmcsoft/j11_aws_http_client/blob/63f05326990317c59f1863be55942054769b437e/src/main/java/com/rmcsoft/aws/http/proxy/BodyHandlerProxy.java - 去看看答案是否在里面。

最佳答案

当我问这个问题时,我并不知道 - 这片土地已经被踩过了。 Nikita Skornyakov(Github 上的@rmcsoft)实现了这个确切的东西(一个使用 Java 11 的 HTTP 客户端 (java.net.http) 的 SdkAsyncHttpClient 实现。它可以在这里找到:https://github.com/rmcsoft/j11_aws_http_client(MIT 许可)。

为了完成这里是一个自包含的(你可能永远不会使用)Java 实现:

package com.dow.as2;

import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import software.amazon.awssdk.http.AbortableInputStream;
import software.amazon.awssdk.http.Protocol;
import software.amazon.awssdk.http.SdkHttpConfigurationOption;
import software.amazon.awssdk.http.SdkHttpFullResponse;
import software.amazon.awssdk.http.SdkHttpRequest;
import software.amazon.awssdk.http.SdkHttpResponse;
import software.amazon.awssdk.http.async.AsyncExecuteRequest;
import software.amazon.awssdk.http.async.SdkAsyncHttpClient;
import software.amazon.awssdk.http.async.SdkAsyncHttpResponseHandler;
import software.amazon.awssdk.http.async.SdkHttpContentPublisher;
import software.amazon.awssdk.utils.AttributeMap;

import java.net.http.HttpClient;
import java.net.http.HttpRequest;
import java.net.http.HttpResponse;
import java.nio.ByteBuffer;
import java.time.Duration;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.Flow;
import java.util.concurrent.atomic.AtomicReference;

import static java.net.http.HttpClient.Version.HTTP_1_1;
import static java.net.http.HttpClient.Version.HTTP_2;
import static software.amazon.awssdk.http.Protocol.HTTP2;
import static software.amazon.awssdk.http.SdkHttpConfigurationOption.CONNECTION_TIMEOUT;
import static software.amazon.awssdk.http.SdkHttpConfigurationOption.PROTOCOL;
import static software.amazon.awssdk.http.SdkHttpConfigurationOption.READ_TIMEOUT;

public class JavaAsyncHttpClient implements SdkAsyncHttpClient {
private static final String CLIENT_NAME = "JavaNetAsyncHttpClient";
private final HttpClient httpClient;

private JavaAsyncHttpClient(AttributeMap options) {
this.httpClient = HttpClient.newBuilder()
.connectTimeout(options.get(CONNECTION_TIMEOUT))
.version(options.get(PROTOCOL) == HTTP2 ? HTTP_2 : HTTP_1_1)
.build();
}

public static Builder builder() {
return new DefaultBuilder();
}

/**
* Create a {@link HttpClient} client with the default properties
*
* @return a {@link JavaHttpClient}
*/
public static SdkAsyncHttpClient create() {
return new DefaultBuilder().build();
}

@Override
public CompletableFuture<Void> execute(AsyncExecuteRequest asyncExecuteRequest) {
SdkHttpRequest request = asyncExecuteRequest.request();
HttpRequest.Builder requestBuilder = HttpRequest.newBuilder().uri(request.getUri());
for (Map.Entry<String, List<String>> header : request.headers().entrySet()) {
// avoid java.lang.IllegalArgumentException: restricted header name: "Content-Length"
if (!header.getKey().equalsIgnoreCase("Content-Length") && !header.getKey().equalsIgnoreCase("Host")) {
for (String headerVal : header.getValue()) {
requestBuilder = requestBuilder.header(header.getKey(), headerVal);
}
}
}

switch (request.method()) {
case POST:
requestBuilder = requestBuilder.POST(new BodyPublisherProxy(asyncExecuteRequest.requestContentPublisher()));
break;
case PUT:
requestBuilder = requestBuilder.PUT(new BodyPublisherProxy(asyncExecuteRequest.requestContentPublisher()));
break;
case DELETE:
requestBuilder = requestBuilder.DELETE();
break;
case HEAD:
requestBuilder = requestBuilder.method("HEAD", HttpRequest.BodyPublishers.noBody());
break;
case PATCH:
throw new UnsupportedOperationException("PATCH not supported");
case OPTIONS:
requestBuilder = requestBuilder.method("OPTIONS", HttpRequest.BodyPublishers.noBody());
break;
}
// Need to use BodyHandlers.ofPublisher() or is that a dead end? How can link up the AWS Publisher/Subscribers
// with HttpClient sendAsync Flow.Publishers/Flow.Subscriber?

var responseHandler = asyncExecuteRequest.responseHandler();
var bodyHandler = new BodyHandlerProxy(asyncExecuteRequest.responseHandler());
return httpClient
.sendAsync(requestBuilder.build(), bodyHandler)
.thenApply(HttpResponse::body)
.thenApply(this::toAwsPublisher)
.thenAccept(responseHandler::onStream)
.exceptionally(t -> {
responseHandler.onError(t);
return null;
});
}

private Subscription toAwsSubscription(Flow.Subscription subscription) {
return new Subscription() {
@Override
public void request(long n) {
subscription.request(n);
}

@Override
public void cancel() {
subscription.cancel();
}
};
}

private Flow.Subscriber<? super ByteBuffer> toFlowSubscriber(Subscriber<? super ByteBuffer> subscriber) {
return new Flow.Subscriber<>() {
@Override
public void onSubscribe(Flow.Subscription subscription) {
subscriber.onSubscribe(toAwsSubscription(subscription));
}

@Override
public void onNext(ByteBuffer item) {
subscriber.onNext(item);
}

@Override
public void onError(Throwable throwable) {
subscriber.onError(throwable);
}

@Override
public void onComplete() {
subscriber.onComplete();
}
};
}

private Publisher<ByteBuffer> toAwsPublisher(Flow.Publisher<ByteBuffer> publisher) {
return new Publisher<>() {
@Override
public void subscribe(Subscriber<? super ByteBuffer> s) {
publisher.subscribe(toFlowSubscriber(s));
}
};
}

@Override
public void close() {
}

@Override
public String clientName() {
return CLIENT_NAME;
}
private static final class DefaultBuilder implements Builder {
private final AttributeMap.Builder standardOptions = AttributeMap.builder();

private DefaultBuilder() {
}

/**
* Sets the read timeout to a specified timeout. A timeout of zero is interpreted as an infinite timeout.
*
* @param socketTimeout the timeout as a {@link Duration}
* @return this object for method chaining
*/
public Builder socketTimeout(Duration socketTimeout) {
standardOptions.put(READ_TIMEOUT, socketTimeout);
return this;
}

public void setSocketTimeout(Duration socketTimeout) {
socketTimeout(socketTimeout);
}

/**
* Sets the connect timeout to a specified timeout. A timeout of zero is interpreted as an infinite timeout.
*
* @param connectionTimeout the timeout as a {@link Duration}
* @return this object for method chaining
*/
public Builder connectionTimeout(Duration connectionTimeout) {
standardOptions.put(CONNECTION_TIMEOUT, connectionTimeout);
return this;
}

public void setConnectionTimeout(Duration connectionTimeout) {
connectionTimeout(connectionTimeout);
}

public Builder protocol(Protocol protocol) {
standardOptions.put(PROTOCOL, protocol);
return this;
}

/**
* Used by the SDK to create a {@link SdkAsyncHttpClient} with service-default values if no other values have been configured
*
* @param serviceDefaults Service specific defaults. Keys will be one of the constants defined in
* {@link SdkHttpConfigurationOption}.
* @return an instance of {@link SdkAsyncHttpClient}
*/
@Override
public SdkAsyncHttpClient buildWithDefaults(AttributeMap serviceDefaults) {
return new JavaAsyncHttpClient(standardOptions.build()
.merge(serviceDefaults)
.merge(SdkHttpConfigurationOption.GLOBAL_HTTP_DEFAULTS));
}
}

private static class BodyHandlerProxy implements HttpResponse.BodyHandler<Flow.Publisher<ByteBuffer>> {

private final SdkAsyncHttpResponseHandler handler;

private BodyHandlerProxy(SdkAsyncHttpResponseHandler responseHandler) {
Objects.requireNonNull(responseHandler);
handler = responseHandler;
}

@Override
public HttpResponse.BodySubscriber<Flow.Publisher<ByteBuffer>> apply(HttpResponse.ResponseInfo responseInfo) {
handler.onHeaders(new SdkHttpHeadersProxy(responseInfo));
return new BodySubscriberProxy();
}
}

static final class SubscriberRef {

Flow.Subscriber<? super ByteBuffer> ref;

SubscriberRef(Flow.Subscriber<? super ByteBuffer> subscriber) {
ref = subscriber;
}

Flow.Subscriber<? super ByteBuffer> get() {
return ref;
}

Flow.Subscriber<? super ByteBuffer> clear() {
Flow.Subscriber<? super ByteBuffer> res = ref;
ref = null;
return res;
}
}

static final class SubscriptionRef implements Flow.Subscription {

final Flow.Subscription subscription;
final SubscriberRef subscriberRef;

SubscriptionRef(Flow.Subscription subscription,
SubscriberRef subscriberRef) {
this.subscription = subscription;
this.subscriberRef = subscriberRef;
}

@Override
public void request(long n) {
if (subscriberRef.get() != null) {
subscription.request(n);
}
}

@Override
public void cancel() {
subscription.cancel();
subscriberRef.clear();
}

void subscribe() {
Flow.Subscriber<?> subscriber = subscriberRef.get();
if (subscriber != null) {
subscriber.onSubscribe(this);
}
}

@Override
public String toString() {
return String
.format("SubscriptionRef/%s@%s", subscription.getClass().getName(), System.identityHashCode(subscription));
}
}

// Adapted from jdk.internal.net.http.ResponseSubscribers.PublishingBodySubscriber
private static class BodySubscriberProxy implements HttpResponse.BodySubscriber<Flow.Publisher<ByteBuffer>> {

private final CompletableFuture<Flow.Subscription>
subscriptionCF = new CompletableFuture<>();
private final CompletableFuture<SubscriberRef>
subscribedCF = new CompletableFuture<>();
private AtomicReference<SubscriberRef>
subscriberRef = new AtomicReference<>();
private final CompletableFuture<Flow.Publisher<ByteBuffer>> body =
subscriptionCF.thenCompose(
(s) -> CompletableFuture.completedFuture(this::subscribe));

private final CompletableFuture<Void> completionCF;

BodySubscriberProxy() {
completionCF = new CompletableFuture<>();
completionCF.whenComplete(
(r, t) -> subscribedCF.thenAccept(s -> complete(s, t)));
}

public CompletionStage<Flow.Publisher<ByteBuffer>> getBody() {
return body;
}


// This is a callback for the subscribedCF.
// Do not call directly!
private void complete(SubscriberRef ref, Throwable t) {
Flow.Subscriber<?> s = ref.clear();
// maybe null if subscription was cancelled
if (s == null) {
return;
}
if (t != null) {
s.onError(t);
return;
}

try {
s.onComplete();
} catch (Throwable x) {
s.onError(x);
}
}

private void signalError(Throwable err) {
completionCF.completeExceptionally(err != null ? err : new IllegalArgumentException("null throwable"));
}

private void signalComplete() {
completionCF.complete(null);
}

private void subscribe(Flow.Subscriber<? super ByteBuffer> subscriber) {
if (subscriber == null) {
throw new IllegalArgumentException("subscriber must not be null");
}
SubscriberRef ref = new SubscriberRef(subscriber);
if (subscriberRef.compareAndSet(null, ref)) {
subscriptionCF.thenAccept((s) -> {
SubscriptionRef subscription = new SubscriptionRef(s, ref);
try {
subscription.subscribe();
subscribedCF.complete(ref);
} catch (Throwable t) {
subscription.cancel();
}
});
} else {
subscriber.onSubscribe(new Flow.Subscription() {
@Override
public void request(long n) {
}

@Override
public void cancel() {
}
});
subscriber.onError(new IllegalStateException("This publisher has already one subscriber"));
}
}

@Override
public void onSubscribe(Flow.Subscription subscription) {
subscriptionCF.complete(subscription);
}

@Override
public void onNext(List<ByteBuffer> item) {
try {
SubscriberRef ref = subscriberRef.get();
Flow.Subscriber<? super ByteBuffer> subscriber = ref.get();
if (subscriber != null) { // may be null if subscription was cancelled.
item.forEach(subscriber::onNext);
}
} catch (Throwable err) {
signalError(err);
subscriptionCF.thenAccept(Flow.Subscription::cancel);
}
}

@Override
public void onError(Throwable throwable) {
// onError can be called before request(1), and therefore can
// be called before subscriberRef is set.
signalError(throwable);
}

@Override
public void onComplete() {
// cannot be called before onSubscribe()
if (!subscriptionCF.isDone()) {
signalError(new InternalError("onComplete called before onSubscribed"));
} else {
// onComplete can be called before request(1),
// and therefore can be called before subscriberRef
// is set.
signalComplete();
}
}
}

private static class SdkHttpHeadersProxy implements SdkHttpFullResponse {

private final HttpResponse.ResponseInfo responseInfo;

private SdkHttpHeadersProxy(HttpResponse.ResponseInfo responseInfo) {
Objects.requireNonNull(responseInfo);
this.responseInfo = responseInfo;
}

@Override
public Optional<String> statusText() {
return Optional.empty();
}

@Override
public int statusCode() {
return responseInfo.statusCode();
}

@Override
public Map<String, List<String>> headers() {
return responseInfo.headers().map();
}

@Override
public Builder toBuilder() {
return SdkHttpResponse
.builder()
.headers(headers())
.statusCode(statusCode());
}

@Override
public Optional<AbortableInputStream> content() {
return Optional.empty(); // will be available at later stage
}
}

private class BodyPublisherProxy implements HttpRequest.BodyPublisher {
private final SdkHttpContentPublisher publisher;

private BodyPublisherProxy(SdkHttpContentPublisher publisher) {
Objects.requireNonNull(publisher);
this.publisher = publisher;
}

@Override
public long contentLength() {
return publisher.contentLength().orElse(-1L);
}

@Override
public void subscribe(Flow.Subscriber<? super ByteBuffer> subscriber) {
publisher.subscribe(toAwsSubscriber(subscriber));
}
}

private Flow.Subscription toFlowSubscription(Subscription subscription) {
return new Flow.Subscription() {
@Override
public void request(long n) {
subscription.request(n);
}

@Override
public void cancel() {
subscription.cancel();
}
};
}

private Subscriber<? super ByteBuffer> toAwsSubscriber(Flow.Subscriber<? super ByteBuffer> subscriber) {
return new Subscriber<>() {
@Override
public void onSubscribe(Subscription s) {
subscriber.onSubscribe(toFlowSubscription(s));
}

@Override
public void onNext(ByteBuffer byteBuffer) {
subscriber.onNext(byteBuffer);
}

@Override
public void onError(Throwable t) {
subscriber.onError(t);
}

@Override
public void onComplete() {
subscriber.onComplete();
}
};
}
}

我建议使用 j11_aws_http_client之前链接到这个怪物(例如,它只处理一小部分受限制的 header )。上面的代码几乎完全是从那个 Github 项目复制粘贴过来的。

如果有一种方法可以使用 java.net.http.BodySubscribers.ofPublisher(即 Flow.Publisher<List<ByteBuffer>>>),则可以大大简化实现。

关于java - AWS SDK v2 SdkAsyncHttpClient 实现 使用 Java 11 java.net.http HttpClient sendAsync,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/58385478/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com