gpt4 book ai didi

java - 在不影响性能或吞吐量的情况下对所有线程具有完全原子性

转载 作者:行者123 更新时间:2023-11-30 09:03:27 25 4
gpt4 key购买 nike

我有一个主机名列表,我应该通过从中创建正确的 URL 来进行调用。假设我在链表中​​有四个主机名(hostA、hostB、hostC、hostD)-

  • 执行 hostA url,如果 hostA 已启动,则获取数据并返回响应。
  • 但是如果 hostA 挂了,那么将 hostA 添加到主机名的阻止列表中,并确保没有其他线程正在调用 hostA。然后尝试执行 hostB url 并返回响应。
  • 但如果 hostB 也已关闭,则将 hostB 也添加到主机名阻止列表并重复相同的操作。

此外,我的应用程序中有一个后台线程在运行,它将包含 block 主机名列表(来 self 的另一个服务),我们不应该调用它,但它每 10 分钟运行一次,因此 block 主机名列表将仅在 10 分钟后更新,因此如果存在任何主机名阻止列表,那么我将不会从主线程调用该主机名,而是尝试调用另一个主机名。这意味着如果 hostA 被阻止,那么它将在阻止列表中显示 hostA 但如果 hostA 启动,则该列表将没有 hostA 在里面。

下面是我的后台线程代码,它从我的服务 URL 获取数据,并在我的应用程序启动后每 10 分钟继续运行一次。然后它将解析来自 URL 的数据并将其存储在 ClientData 类变量中 -

临时调度程序
public class TempScheduler {

// .. scheduledexecutors service code to start the background thread

// call the service and get the data and then parse
// the response.
private void callServiceURL() {
String url = "url";
RestTemplate restTemplate = new RestTemplate();
String response = restTemplate.getForObject(url, String.class);
parseResponse(response);
}

// parse the response and store it in a variable
private void parseResponse(String response) {
//...

// get the block list of hostnames
Map<String, List<String>> coloExceptionList = gson.fromJson(response.split("blocklist=")[1], Map.class);
List<String> blockList = new ArrayList<String>();
for(Map.Entry<String, List<String>> entry : coloExceptionList.entrySet()) {
for(String hosts : entry.getValue()) {
blockList.add(hosts);
}
}

// store the block list of hostnames which I am not supposed to make a call
ClientData.replaceBlockedHosts(blockList);
}
}

下面是我的 ClientData 类。 replaceBlockedHosts 方法只会被后台线程调用,这意味着只有一个作者。但是 isHostBlocked 方法将被主应用程序线程多次调用以检查特定主机名是否被阻止。而且 blockHost 方法会从 catch block 中多次调用,以将关闭的主机添加到 blockedHosts 列表中,所以我需要确保所有读取线程可以看到一致的数据,并且不会调用该停机主机,而是调用主机名链表中的下一个主机。

客户数据
public class ClientData {

// .. some other variables here which in turn used to decide the list of hostnames

private static final AtomicReference<ConcurrentHashMap<String, String>> blockedHosts =
new AtomicReference<ConcurrentHashMap<String, String>>(new ConcurrentHashMap<String, String>());

public static boolean isHostBlocked(String hostName) {
return blockedHosts.get().containsKey(hostName);
}

public static void blockHost(String hostName) {
blockedHosts.get().put(hostName, hostName);
}

public static void replaceBlockedHosts(List<String> hostNames) {
ConcurrentHashMap<String, String> newBlockedHosts = new ConcurrentHashMap<>();
for (String hostName : hostNames) {
newBlockedHosts.put(hostName, hostName);
}
blockedHosts.set(newBlockedHosts);
}
}

下面是我的主要应用程序线程代码,其中包含我应该调用的主机名列表。如果 hostname 为空或在阻止列表类别中,那么我将不会调用该特定主机名,而是尝试列表中的下一个主机名。

@Override
public DataResponse call() {

List<String> hostnames = new LinkedList<String>();

// .. some separate code here to populate the hostnames list
// from ClientData class

for (String hostname : hostnames) {

// If host name is null or host name is in block list category, skip sending request to this host
if (hostname == null || ClientData.isHostBlocked(hostname)) {
continue;
}

try {
String url = generateURL(hostname);

response = restTemplate.getForObject(url, String.class);

break;
} catch (RestClientException ex) {
// add host to block list,
// Is this call fully atomic and thread safe for blockHost method
// in ClientData class?
ClientData.blockHost(hostname);
}
}
}

我不需要在主线程关闭时调用主机名。我的后台线程也从我的一项服务中获取这些详细信息,每当任何服务器关闭时,它都会有主机名列表,这些主机名是 block 主机,只要它们启动,该列表就会更新。

而且,每当抛出任何 RestClientException 时,我都会将该主机名添加到 blockedHosts concurrentmap 中,因为我的后台线程每 10 分钟运行一次,这样 map 就不会10 分钟后才能获得此主机名。每当这个服务器恢复时,我的后台会自动更新这个列表。

我上面的主机名阻止列表代码是完全原子的和线程安全的吗?因为我想要的是 - 如果 hostA 已关闭,则在更新阻止的主机列表之前,其他线程不应调用 hostA。

最佳答案

请记住,与其他主机的通信比您在线程中所做的任何事情都要花费更多的时间。在这种情况下,我不会担心原子操作。

假设我们有线程 t1t2t1hostA 发送请求并等待响应。达到超时时,将抛出 RestClientException。现在在抛出异常和将该主机添加到被阻止主机列表之间的时间跨度非常小。 可能发生 t2 尝试在主机被阻止之前向 hostA 发送请求 - 但更有可能的是t2 已经在 t1 等待响应的很长时间内发送了它,这是您无法阻止的。

您可以尝试设置合理的超时时间。当然还有其他类型的错误不会等待超时,但即使是这些错误也比处理异常花费的时间更多。

使用 ConcurrentHashMap 是线程安全的,应该足以跟踪被阻止的主机。

AtomicReference 本身不会做很多事情,除非你使用像 compareAndSet 这样的方法,所以调用不是原子的(但如上所述不需要是在我看来)。如果你真的想在收到异常后立即阻止主机,你应该使用某种同步。你可以使用 synchronized set存储被阻止的主机。这仍然不能解决在实际检测到任何连接错误之前需要一些时间的问题。


关于更新:如评论中所述, future 超时应大于请求超时。否则 Callable 可能会被取消并且主机不会被添加到列表中。使用 Future.get 时,您甚至可能不需要超时,因为请求最终会成功或失败。

当主机 A 宕机时您看到许多异常的实际问题可能只是许多线程仍在等待主机 A 的响应。您只在开始请求之前检查是否有阻塞的主机,而不是在任何请求期间。仍在等待来自该主机的响应的任何线程将继续这样做,直到达到超时。

如果你想阻止这种情况,你可以尝试定期检查当前主机是否尚未被阻止。这是一个非常幼稚的解决方案,并且有点违背 future 的目的,因为它基本上是轮询。不过,它应该有助于理解一般问题。

// bad pseudo code 

DataTask dataTask = new DataTask(dataKeys, restTemplate);
future = service.submit(dataTask);

while(!future.isDone()) {
if( blockedHosts.contains(currentHost) ) {
// host unreachable, don't wait for http timeout
future.cancel();
}
thread.sleep(/* */);
}

更好的方法是在同一主机出现故障时向所有等待同一主机的 DataTask 线程发送中断,这样它们就可以中止请求并尝试下一个主机。

关于java - 在不影响性能或吞吐量的情况下对所有线程具有完全原子性,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/25614948/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com