gpt4 book ai didi

java - ServiceBusSessionReceiverAsyncClient 在关闭期间抛出 IllegalStateException

转载 作者:行者123 更新时间:2023-12-03 06:21:02 24 4
gpt4 key购买 nike

使用 ServicebusSessionReceiverAsyncClient 从服务总线队列接收单个消息时,会引发 IllegalStateException。该消息提到尝试向已关闭的连接添加积分。

我使用 take(1) 和 next() 将 Flux 转换为单个结果 Mono。文档说,在流上使用 take(1) 将在第一个结果后关闭流,这就是我想要做的。

我的接收器代码:

private <T extends IWocTransaction> Mono<Optional<T>> responseAsync(String transactionId, Class<T> clazz) {

var asyncClient = sbClientBuilder.connectionString(sbConnectionString)
.sessionReceiver()
.queueName("my-callback-queue")
.receiveMode(ServiceBusReceiveMode.RECEIVE_AND_DELETE)
.buildAsyncClient();

var msgStream = Flux.usingWhen(asyncClient.acceptSession(transactionId),
receiver -> receiver.receiveMessages(),
receiver -> Mono.fromRunnable(receiver::close)
);

return Mono.from(msgStream

.timeout(timeout)
.take(1)
.next()

).map(message -> {

var json = message.getBody().toString();

try {
var val = objectMapper.readValue(json, clazz);
return val != null ? Optional.of(val) : Optional.<T>empty();
} catch (Exception e) {
log.error("Error deserializing response from string {}", json, e);
return Optional.<T>empty();
}
})
.doOnError(t -> {
if (t instanceof TimeoutException) {
log.error("Timeout error waiting on API callback {}", kv("ApiTimeout", timeout.toString()), t);
} else {
log.error("Error waiting for async callback", t);
}
}).onErrorReturn(Optional.empty());
}

此代码工作正常,但我每次运行时都会遇到此异常:

13:46:27.122 [io-executor-thread-1] INFO  c.a.m.s.ServiceBusClientBuilder - {"az.sdk.message":"Closing a dependent client.","numberOfOpenClients":1}
13:46:27.127 [receiver-0-1] INFO c.a.m.s.ServiceBusSessionReceiver - {"az.sdk.message":"There is no lock token.","sessionId":"adfadsr","messageId":"fb70e81e4d304b8fb34092440243554a"}
13:46:27.138 [receiver-0-1] INFO c.a.m.s.ServiceBusReceiverAsyncClient - Removing receiver links.
13:46:27.167 [receiver-0-1] ERROR c.a.c.a.i.ReactorReceiver - {"az.sdk.message":"Cannot add credits to closed link: adfadsr","exception":"Cannot add credits to closed link: adfadsr","connectionId":"MF_57a511_1680201985206","entityPath":"woc-callback-queue","linkName":"adfadsr"}
13:46:27.175 [receiver-0-1] ERROR reactor.core.publisher.Operators - Operator called default onErrorDropped
**reactor.core.Exceptions$ErrorCallbackNotImplemented: java.lang.IllegalStateException: Cannot add credits to closed link: adfadsr**
Caused by: java.lang.IllegalStateException: Cannot add credits to closed link: adfadsr
at com.azure.core.amqp.implementation.ReactorReceiver.addCredits(ReactorReceiver.java:227)
at com.azure.messaging.servicebus.ServiceBusSessionReceiver.lambda$new$2(ServiceBusSessionReceiver.java:92)
at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.request(FluxPeekFuseable.java:138)
at reactor.core.publisher.FluxHide$SuppressFuseableSubscriber.request(FluxHide.java:152)
at reactor.core.publisher.FluxPublishOn$PublishOnSubscriber.runAsync(FluxPublishOn.java:447)
at reactor.core.publisher.FluxPublishOn$PublishOnSubscriber.run(FluxPublishOn.java:527)
at reactor.core.scheduler.ImmediateScheduler$ImmediateSchedulerWorker.schedule(ImmediateScheduler.java:84)
at reactor.core.publisher.FluxPublishOn$PublishOnSubscriber.trySchedule(FluxPublishOn.java:312)
at reactor.core.publisher.FluxPublishOn$PublishOnSubscriber.onNext(FluxPublishOn.java:237)
at io.micronaut.reactive.reactor.instrument.ReactorSubscriber.onNext(ReactorSubscriber.java:57)
at reactor.core.publisher.FluxHide$SuppressFuseableSubscriber.onNext(FluxHide.java:137)
at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.onNext(FluxPeekFuseable.java:210)
at io.micronaut.reactive.reactor.instrument.ReactorSubscriber.onNext(ReactorSubscriber.java:57)
at reactor.core.publisher.FluxHide$SuppressFuseableSubscriber.onNext(FluxHide.java:137)
at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.onNext(FluxPeekFuseable.java:210)
at io.micronaut.reactive.reactor.instrument.ReactorSubscriber.onNext(ReactorSubscriber.java:57)
at reactor.core.publisher.FluxHide$SuppressFuseableSubscriber.onNext(FluxHide.java:137)
at reactor.core.publisher.FluxPublishOn$PublishOnSubscriber.runAsync(FluxPublishOn.java:440)
Caused by: java.lang.IllegalStateException: Cannot add credits to closed link: adfadsr

at reactor.core.publisher.FluxPublishOn$PublishOnSubscriber.run(FluxPublishOn.java:527)
at io.micronaut.reactive.reactor.instrument.ReactorInstrumentation.lambda$init$0(ReactorInstrumentation.java:62)
at reactor.core.scheduler.WorkerTask.call(WorkerTask.java:84)
at reactor.core.scheduler.WorkerTask.call(WorkerTask.java:37)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)
13:46:27.194 [reactor-executor-1] INFO c.a.c.a.i.handler.ReceiveLinkHandler - {"az.sdk.message":"onLinkRemoteClose","connectionId":"MF_57a511_1680201985206","errorCondition":null,"errorDescription":null,"linkName":"adfadsr","entityPath":"woc-callback-queue"}
13:46:27.198 [reactor-executor-1] INFO c.a.c.a.i.ReactorSession - {"az.sdk.message":"Complete. Removing receive link.","connectionId":"MF_57a511_1680201985206","linkName":"adfadsr","entityPath":"woc-callback-queue"}
13:46:27.199 [reactor-executor-1] INFO c.a.c.a.i.handler.ReceiveLinkHandler - {"az.sdk.message":"onLinkFinal","connectionId":"MF_57a511_1680201985206","linkName":"adfadsr","entityPath":"woc-callback-queue"}

如何防止抛出 IllegalStateException 或至少处理它?<​​/p>

最佳答案

IllegalStateException:无法向关闭的链接添加积分“不应将其抛出”到应用程序,而“应仅记录”。

有时会发生这种情况,因为有几个线程同时运行。一是非阻塞IO_thread(处理消息帧,通过流帧发送信用),二是向应用程序传递消息的Worker_thread。第三个线程是应用程序的 handler_thread,在该线程上调用应用的responseAsync。

发生的情况是,当responseAsync从[any]_thread关闭客户端时,后台的IO_thread在接收关闭请求时仍然可能正在做一些工作。当 IO_thread 正在发送流程帧而客户端的其他部分正在关闭时,该错误会出现在日志中。该日志条目将被忽略。

看起来应用程序的设计是为每个请求创建和处理客户端。这意味着应用程序会根据每个请求创建和关闭 TCP(到服务总线)连接,这可能会很繁重。

关于java - ServiceBusSessionReceiverAsyncClient 在关闭期间抛出 IllegalStateException,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/75892274/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com