gpt4 book ai didi

java - Vertx/RxJava/WebClient/ApiGateway/Reactive

转载 作者:行者123 更新时间:2023-12-01 18:20:40 24 4
gpt4 key购买 nike

我正在使用 vert.xRxJava 开发 Apigateway。我想发送 2 个 Api 的响应式(Reactive)请求,从它们那里获取响应并通过 HttpServer 发送组合的 JSON。但 onComplete() 执行较早并返回空 JSON。我认为问题是由 vert.x异步字符引起的,但我不知道到底出了什么问题。

这是我的方法:

private void dispatchBoth(RoutingContext routingContext) {

Observer<String> observer = new Observer<String>() {

JsonArray jsonArray = new JsonArray();

@Override
public void onSubscribe(Disposable disposable) {
System.out.println("Start");

}

@Override
public void onNext(String s) {
Thread t = new Thread(() -> {
if(s=="/api/userApi/selectAllUsers") {

WebClient client = WebClient.create(vertx);
client
.get(8081, "localhost", s)
.send(ar->{
if (ar.succeeded()) {
HttpResponse<Buffer> response = ar.result();

jsonArray.addAll(response.bodyAsJsonArray());
System.out.println(jsonArray.encodePrettily());

} else {
System.out.println("Something went wrong " + ar.cause().getMessage());
}
});

}else if(s=="/api/holidayApi/selectAllHolidays") {
WebClient client = WebClient.create(vertx);
client
.get(8080, "localhost", s)
.send(ar -> {

if (ar.succeeded()) {

HttpResponse<Buffer> response = ar.result();

jsonArray.addAll(response.bodyAsJsonArray());
// System.out.println(jsonArray.encodePrettily());

} else {
System.out.println("Something went wrong " + ar.cause().getMessage());
}
});
}
});
t.start();
}
@Override
public void onError(Throwable throwable) {

}

@Override
public void onComplete() {
System.out.println(jsonArray.encodePrettily());
routingContext.response().end(jsonArray.encodePrettily());
}
};
Observable.fromArray(com).subscribe(observer);
}

这就是我在控制台上得到的输出:

[ ]
[ {
"holidayId" : 2,
"userId" : 3,
"place" : "Poland",
"date" : {
"year" : 2016,
"month" : "DECEMBER",
"dayOfMonth" : 29,
"dayOfWeek" : "THURSDAY",
"era" : "CE",
"dayOfYear" : 364,
"leapYear" : true,
"monthValue" : 12,
"chronology" : {
"id" : "ISO",
"calendarType" : "iso8601"
}
}
}, {
"holidayId" : 10,
"userId" : 1,
"place" : "Netherland",
"date" : {
"year" : 2020,
"month" : "JANUARY",
"dayOfMonth" : 21,
"dayOfWeek" : "TUESDAY",
"era" : "CE",
"dayOfYear" : 21,
"leapYear" : true,
"monthValue" : 1,
"chronology" : {
"id" : "ISO",
"calendarType" : "iso8601"
}
}
}, {
"userId" : 1,
"name" : "Kacper",
"phone_number" : "667667202"
}, {
"userId" : 3,
"name" : "Kamil",
"phone_number" : "6734583443"
}, {
"userId" : 4,
"name" : "Janek",
"phone_number" : "231253575"
}, {
"userId" : 5,
"name" : "Grzegorz",
"phone_number" : "123456789"
}, {
"userId" : 6,
"name" : "Justin",
"phone_number" : "111000111"
}, {
"userId" : 8,
"name" : "Mike",
"phone_number" : "997"
}, {
"userId" : 9,
"name" : "Gorge",
"phone_number" : "991"
} ]

最佳答案

onComplete 按时执行:当字符串输入流完成时。您需要另一种方式来等待所有 I/O 操作完成的那一刻。这是一个棘手的部分,我不知道 Vertx 或 RxJava 是否可以做到这一点,但标准 Java API 可以,使用 CompletableFuture。因此,我们创建从 CompletableFuture 到 Handler 的适配器,该适配器保存一次 I/O 操作的结果:

class HandelerFuture extends CompletableFuture<JsonArray> 
implements Handler<AsyncResult<HttpResponse<Buffer>>> {
@Override
public void handle(AsyncResult<HttpResponse<Buffer>> ar) {
if (ar.succeeded()) {
JsonArray array = ar.result().bodyAsJsonArray();
super.complete(array);
} else {
super.completeExceptionally(ar.cause());
}
}
}

此外,您不需要将 onNext 方法的主体包装在 Tread 中。其次,您不需要检查传递的 url 字符串是什么,因为它没有任何区别。第三,您仅使用 Observer 来处理 url 字符串列表 - 这过于复杂。普通循环就足够了。

CompletableFuture<HandelerFuture[]> dispatchBoth(String... urls) {
ArrayList<HandelerFuture> futures = new ArrayList<>(); // all results
for (String url : urls) {
HandelerFuture future = new HandelerFuture();
futures.add(future);
WebClient client = WebClient.create(vertx);
client.get(8081, "localhost", url)
.send(future);
}
CompletableFuture all = new CompletableFuture();
HandelerFuture[] array = futures.toArray(new HandelerFuture[0]);
CompletableFuture.allOf(array)
.thenRunAsync(() -> all.complete(array));
return all;
}

然后可以按如下方式运行:

    CompletableFuture<HandelerFuture[]> future = dispatchBoth(com);
HandelerFuture[] results = future.get();
JsonArray finalArray;
for (HandelerFuture result:results) {
try {
// extract partial json array
JsonArray partialArray = result.get();
// combine partialArray with finalArray somehow
} catch (Exception e) {
// this is the exception got in handle() method as ar.cause().
e.printStackTrace();
}
}
routingContext.response().end(finalArray.encodePrettily());

你没有告诉你如何组合 json 数组,所以我没有实现它。

关于java - Vertx/RxJava/WebClient/ApiGateway/Reactive,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/60297653/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com