gpt4 book ai didi

java - 获取 JAX-RS AsyncResponse,但稍后暂停

转载 作者:行者123 更新时间:2023-12-01 09:40:17 24 4
gpt4 key购买 nike

考虑以下代码来监听长轮询的更新:

Map<String, List<AsyncResponse>> tagMap = new ConcurrentGoodStuff();

// This endpoint listens for notifications of the tag
@Produces(MediaType.APPLICATION_JSON)
@Consumes(MediaType.APPLICATION_JSON)
@GET
@Path("listen/{tag}")
public void listenForUpdates(
@PathParam("tag") final String tag,
@Suspended final AsyncResponse response) {
tagMap.get(tag).add(response);
}

// This endpoint is for push-style notifications
@Produces(MediaType.APPLICATION_JSON)
@Consumes(MediaType.APPLICATION_JSON)
@PUT
@Path("update/{tag}/{value}")
public Response updateTag(
@PathParam("tag") final String tag,
@PathParam("value") final String value) {
for(AsyncResponse response : tagMap.get(tag)) {
// Resumes all previously suspended responses
response.resume(value);
}
return Response.ok("cool whatever").build();
}

客户端使用普通 Jersey 客户端的 AsyncInvoker 添加监听器,调用异步任务,然后另一个任务调用 update 方法。

当我测试这个时,我遇到了竞争条件。在 listenForUpdates() 上异步添加监听器后,我立即使用 updateTag() 在端点上进行同步更新。但是更新在添加监听器之前运行,并且异步响应无法恢复。

解决此问题的方法是在将响应添加到监听器之后调用响应的 suspend() 方法。但鉴于 @Suspending 提供了一个已经挂起的 AsyncResponse 对象,目前尚不清楚如何做到这一点。我应该怎么做才能使异步响应仅在添加到监听器之后才暂停?那实际上会调用 suspend 方法吗?我怎样才能让它与 Jersey 异步客户端一起工作,或者我应该使用不同的长轮询客户端?

对于解决方案,我对不同的库持开放态度,例如 Atmosphere 或 Guava。我不愿意在我的测试中添加 Thread.sleep() ,因为这是一个等待发生的间歇性故障。

最佳答案

我最终使用了 RxJava,但在此之前使用 BlockingQueue 提出了一个同样好的解决方案。而不是 Map 中的 List。事情是这样的:

ConcurrentMap<String, BlockingQueue<AsyncResponse>> tagMap = new ConcurrentGoodStuff();

// This endpoint initiates a listener array for the tag.
@Produces(MediaType.APPLICATION_JSON)
@Consumes(MediaType.APPLICATION_JSON)
@GET
@Path("initListen/{tag}")
public void listenForUpdates(
@PathParam("tag") final String tag) {
tagMap.putIfAbsent(tag, new LinkedBlockingQueue<>());
}

// This endpoint listens for notifications of the tag
@Produces(MediaType.APPLICATION_JSON)
@Consumes(MediaType.APPLICATION_JSON)
@GET
@Path("listen/{tag}")
public void listenForUpdates(
@PathParam("tag") final String tag,
@Suspended final AsyncResponse response) {
BlockingQueue<AsyncResponse> responses = tagMap.get(tag);

if (responses != null) {
responses.add(response);
}
}

// This endpoint is for push-style notifications
@Produces(MediaType.APPLICATION_JSON)
@Consumes(MediaType.APPLICATION_JSON)
@PUT
@Path("update/{tag}/{value}")
public Response updateTag(
@PathParam("tag") final String tag,
@PathParam("value") final String value) {
BlockingQueue<AsyncResponse> responses = tagMap.get(tag);

if (responses == null) {
return Response.noContent().build();
}
if (responses.isEmpty()) {
// Block-wait for an async listener
try {
AsyncResponse response = tagMap.poll(15, TimeUnit.SECONDS);

if (response == null) {
return Response.noContent().build();
}

response.resume(value);
} catch (InterruptedException e) {
return Response.noContent().build();
}
} else {
for (AsyncResponse response : responses) {
// Resumes all previously suspended responses
response.resume(value);
}
}
return Response.ok("cool whatever").build();
}

我还没有测试过这个确切的代码,但我过去使用过它的某个版本。只要先同步调用initListen端点,就可以调用异步listen端点,然后再调用同步update端点,就不会出现这样的情况。是任何重要的竞争条件。

update 端点中存在轻微的竞争条件提示,但影响较小。 responses 阻塞队列可能在迭代时变空,或者可能由多个源以不同方式更新。为了缓解这个问题,我使用了 drainTo(Collection)每个请求实例化数据结构上的方法。这仍然不能解决多个客户端可能尝试更新监听器的相同标签的用例,但我不需要这个用例。

关于java - 获取 JAX-RS AsyncResponse,但稍后暂停,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/38510555/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com