gpt4 book ai didi

java - 并发轮询下游依赖并等待全部成功

转载 作者:行者123 更新时间:2023-12-02 09:31:48 27 4
gpt4 key购买 nike

我正在尝试编写一个简单的函数,该函数可以将多个消息长轮询到下游依赖项,而不会耗尽它,并且仅在所有消息成功时才存在。

我想出了一种方法,将每个消息轮询包装到可调用对象中,并使用 ExecutorService 提交可调用对象列表。


public void poll(final List<Long> messageIdList) {
ExecutorService executorService = Executors.newFixedThreadPool(messageIdList.size());
List<MessageStatusCallable> callables = messageIdList.stream()
.map(messageId -> new MessageStatusCallable(messageId)).collect(Collectors.toList());
boolean allSuccess = false;
try {
allSuccess = executorService.invokeAll(callables).stream().allMatch(success -> {
try {
return success.get().equals(Boolean.TRUE);
} catch (InterruptedException e) {
e.printStackTrace();
return false;
} catch (ExecutionException e) {
e.printStackTrace();
return false;
}
});
} catch (InterruptedException e) {
e.printStackTrace();
}

}

private class MessageStatusCallable implements Callable<Boolean> {
private Long messageId;
public MessageStatusCallable(Long messageId) {
this.messageId = messageId;
}

/**
* Computes a result, or throws an exception if unable to do so.
*
* @return computed result
* @throws Exception if unable to compute a result
*/
@Override
public Boolean call() throws Exception {
String messageStatus = downstreamService.getMessageStatus(messageId);
while(messageStatus == null || !messageStatus.equals( STATUS_VALUE_SUCCEEDED) {
messageStatus = messageLogToControlServer.getMessageStatus(messageId);
Thread.sleep(TimeUnit.MICROSECONDS.toMillis(100));
}
LOG.info("Message: " + messageId + " Succeded");
return true;
}
}

我想知道是否有更好的方法来实现这一点,因为 Thread.sleep 是阻塞的并且丑陋的。

最佳答案

我不确定这是最好的解决方案,但我想到您可以使用 CountDownLatchScheduledExecutorService

    public void poll(final List<Long> messageIdList) throws InterruptedException {
CountDownLatch latch = new CountDownLatch(messageIdList.size());
ScheduledExecutorService executorService = Executors.newScheduledThreadPool(POOL_SIZE);
try {
for (Long messageId : messageIdList) {
MessageStatusCallable callable = new MessageStatusCallable(messageId, latch);
executorService.scheduleWithFixedDelay(
() -> {
String messageStatus = downstreamService.getMessageStatus(messageId);
if (STATUS_VALUE_SUCCEEDED.equals(messageStatus)) {
latch.countDown();
throw new CompletionException("Success - killing the task", null);
}
},
0, 100, TimeUnit.MILLISECONDS);
}

latch.await();

} finally {
executorService.shutdown();
}
}

除了为了简洁起见之外,我可能也不会将 Runnable 作为 lambda。

关于java - 并发轮询下游依赖并等待全部成功,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/57897837/

27 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com