gpt4 book ai didi

java - 进程池的应用程序级负载平衡器

转载 作者:塔克拉玛干 更新时间:2023-11-03 01:28:46 27 4
gpt4 key购买 nike

我们有传统的 C++ 单体软件,其作用类似于请求-回复 TCP 服务器。该软件是单线程的,可以同时处理一个请求。目前,我们有固定的此类流程池来并行服务多个客户端。

由于消息量很大,客户端会定期遇到请求处理的严重延迟。目前我们有一个想法通过在客户和工作人员之间引入一种代理来解决这个问题:

Proxy

我们希望此代理具有以下功能:

  1. 应用程序级负载平衡:通过检查请求上下文和客户端 ID 在工作人员之间分配请求
  2. 控制和监控工作进程的生命周期
  3. 产生额外的工作进程(在不同的 PC 上)以处理高峰

事实上,我们希望它的行为类似于 Java 中的 ExecutorService,但使用工作进程而不是线程。目前的想法是基于 Jetty 或 Tomcat 服务器在 Java 中实现这个平衡器,内部消息队列和 servlet 将请求转发给工作进程。

但我想知道:是否存在可以自动执行此过程的现有解决方案(最好是 Java)?实现此类代理的最简单方法是什么?

更新:

我对请求上下文所做的工作 - 好吧,那个 C++ 服务器确实是一个困惑的软件。事实上,每次它接收到不同的上下文时,它都会相应地更新内部缓存以匹配该上下文。例如,如果您请求该服务器为您提供一些英文数据,那么它会将内部缓存重新加载为英文。如果下一个请求是法语的,那么它会再次重新加载缓存。显然,我想通过更智能地转发请求来最大程度地减少缓存重新加载的次数。

通信协议(protocol)是自制的(基于 TCP/IP),但从中提取上下文部分相对容易。

目前负载均衡是在客户端实现的,因此每个客户端都配置为知道所有服务器节点并以循环方式向它们发送请求。这种方法存在几个问题:客户端的连接管理复杂,与彼此不认识的多个客户端一起工作不正确,无法管理节点生命周期。我们无法通过重构解决列出的问题。

很可能我们最终会使用自制的转发解决方案,但我仍然想知道是否有至少用于流程管理的现有产品?理想情况下,这将是 Java 应用程序服务器,它可以:

  • 生成子节点(另一个 Java 进程)
  • 监控他们的生命周期
  • 通过某种协议(protocol)与他们沟通

也许这个功能已经在一些现有的应用服务器中实现了?这将大大简化问题!

最佳答案

关于流程管理,您可以通过混合 Apache Commons Exec 的功能轻松实现您的目标库可以帮助产生新的 worker 实例 Apache Commons Pool将管理正在运行的实例的库。

实现非常简单,因为公共(public)池将确保您一次可以使用一个对象,直到它返回到池中。如果对象没有返回到池中,公共(public)池将为您生成新实例。您可以通过添加看门狗服务(来自 apache commons exec)来控制工作人员的生命周期 - 看门狗可以杀死一段时间未使用的实例,或者您也可以使用公共(public)池本身,例如通过调用 pool.clearOldest()。您还可以通过调用 pool.getNumActive() 查看当前处理了多少请求(有多少工作人员处于 Activity 状态)。引用 GenericKeyedObjectPool 的 javadoc 以查看更多内容。

可以通过在 Tomcat 服务器上运行一个简单的 servlet 来完成实现。这个 servlet 将实例化池并通过调用 pool.borowObject(parameters) 简单地向池请求新的 worker。在参数内部,您可以定义您的工作人员应该具有哪些特征来处理请求(在您的情况下,参数应该包括语言)。如果没有这样的工作人员可用(例如没有法语工作人员)池将为您生成新的工作人员。此外,如果有一个工作人员但该工作人员当前正在处理另一个请求,则池也会为您生成一个新工作人员(因此您将有两个工作人员处理相同的语言)。当您调用 pool.returnObject(parameters, instance) 时,Worker 将准备好处理新请求。

整个实现只用了不到 200 行代码(完整代码见下文)。该代码包括工作进程从外部被杀死或崩溃的情况(请参阅 WorkersFactory.activateObject())。

恕我直言:使用 Apache Cammel 对您来说不是一个好的选择,因为它太大了,而且它被设计为不同消息格式之间的中介总线。你不需要做转换,你不需要处理不同格式的消息。寻求简单的解决方案。

package com.myapp;

import org.apache.commons.exec.CommandLine;
import org.apache.commons.exec.DefaultExecutor;
import org.apache.commons.exec.ExecuteWatchdog;
import org.apache.commons.pool2.BaseKeyedPooledObjectFactory;
import org.apache.commons.pool2.KeyedPooledObjectFactory;
import org.apache.commons.pool2.PooledObject;
import org.apache.commons.pool2.impl.DefaultPooledObject;
import org.apache.commons.pool2.impl.GenericKeyedObjectPool;

import javax.servlet.ServletException;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import java.io.IOException;
import java.util.Objects;

public class BalancingServlet extends javax.servlet.http.HttpServlet {

private final WorkersPool workersPool;

public BalancingServlet() {
workersPool = new WorkersPool(new WorkersFactory());
}


protected void doPost(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException {

}

protected void doGet(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException {
response.getWriter().println("Balancing");

String language = request.getParameter("language");
String someOtherParam = request.getParameter("other");
WorkerParameters workerParameters = new WorkerParameters(language, someOtherParam);

String requestSpecificParam1 = request.getParameter("requestParam1");
String requestSpecificParam2 = request.getParameter("requestParam2");

try {
WorkerInstance workerInstance = workersPool.borrowObject(workerParameters);
workerInstance.handleRequest(requestSpecificParam1, requestSpecificParam2);
workersPool.returnObject(workerParameters, workerInstance);

} catch (Exception e) {
e.printStackTrace();
}


}
}

class WorkerParameters {
private final String workerLangauge;
private final String someOtherParam;

WorkerParameters(String workerLangauge, String someOtherParam) {
this.workerLangauge = workerLangauge;
this.someOtherParam = someOtherParam;
}

public String getWorkerLangauge() {
return workerLangauge;
}

public String getSomeOtherParam() {
return someOtherParam;
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;

WorkerParameters that = (WorkerParameters) o;

return Objects.equals(this.workerLangauge, that.workerLangauge) && Objects.equals(this.someOtherParam, that.someOtherParam);
}

@Override
public int hashCode() {
return Objects.hash(workerLangauge, someOtherParam);
}
}

class WorkerInstance {
private final Thread thread;
private WorkerParameters workerParameters;

public WorkerInstance(final WorkerParameters workerParameters) {
this.workerParameters = workerParameters;

// launch the process here
System.out.println("Spawing worker for language: " + workerParameters.getWorkerLangauge());

// use commons Exec to spawn your process using command line here

// something like


thread = new Thread(new Runnable() {
@Override
public void run() {
try {
String line = "C:/Windows/notepad.exe" ;
final CommandLine cmdLine = CommandLine.parse(line);

final DefaultExecutor executor = new DefaultExecutor();
executor.setExitValue(0);
// ExecuteWatchdog watchdog = new ExecuteWatchdog(60000); // if you want to kill process running too long
// executor.setWatchdog(watchdog);

int exitValue = executor.execute(cmdLine);
System.out.println("process finished with exit code: " + exitValue);
} catch (IOException e) {
throw new RuntimeException("Problem while executing application for language: " + workerParameters.getWorkerLangauge(), e);
}


}
});

thread.start();


System.out.println("Process spawned for language: " + workerParameters.getWorkerLangauge());


}

public void handleRequest(String someRequestParam1, String someRequestParam2) {
System.out.println("Handling request for extra params: " + someRequestParam1 + ", " + someRequestParam2);

// communicate with your application using parameters here

// communcate via tcp or whatever protovol you want using extra parameters: someRequestParam1, someRequestParam2


}

public boolean isRunning() {
return thread.isAlive();
}


}

class WorkersFactory extends BaseKeyedPooledObjectFactory<WorkerParameters, WorkerInstance> {

@Override
public WorkerInstance create(WorkerParameters parameters) throws Exception {
return new WorkerInstance(parameters);
}

@Override
public PooledObject<WorkerInstance> wrap(WorkerInstance worker) {
return new DefaultPooledObject<WorkerInstance>(worker);
}

@Override
public void activateObject(WorkerParameters worker, PooledObject<WorkerInstance> p)
throws Exception {
System.out.println("Activating worker for lang: " + worker.getWorkerLangauge());

if (! p.getObject().isRunning()) {
System.out.println("Worker for lang: " + worker.getWorkerLangauge() + " stopped working, needs to respawn it");
throw new RuntimeException("Worker for lang: " + worker.getWorkerLangauge() + " stopped working, needs to respawn it");
}
}

@Override
public void passivateObject(WorkerParameters worker, PooledObject<WorkerInstance> p)
throws Exception {
System.out.println("Passivating worker for lang: " + worker.getWorkerLangauge());
}

}

class WorkersPool extends GenericKeyedObjectPool<WorkerParameters, WorkerInstance> {

public WorkersPool(KeyedPooledObjectFactory<WorkerParameters, WorkerInstance> factory) {
super(factory);
}
}

关于java - 进程池的应用程序级负载平衡器,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/25259873/

27 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com