- iOS/Objective-C 元类和类别
- objective-c - -1001 错误,当 NSURLSession 通过 httpproxy 和/etc/hosts
- java - 使用网络类获取 url 地址
- ios - 推送通知中不播放声音
我们有传统的 C++ 单体软件,其作用类似于请求-回复 TCP 服务器。该软件是单线程的,可以同时处理一个请求。目前,我们有固定的此类流程池来并行服务多个客户端。
由于消息量很大,客户端会定期遇到请求处理的严重延迟。目前我们有一个想法通过在客户和工作人员之间引入一种代理来解决这个问题:
我们希望此代理具有以下功能:
事实上,我们希望它的行为类似于 Java 中的 ExecutorService,但使用工作进程而不是线程。目前的想法是基于 Jetty 或 Tomcat 服务器在 Java 中实现这个平衡器,内部消息队列和 servlet 将请求转发给工作进程。
但我想知道:是否存在可以自动执行此过程的现有解决方案(最好是 Java)?实现此类代理的最简单方法是什么?
更新:
我对请求上下文所做的工作 - 好吧,那个 C++ 服务器确实是一个困惑的软件。事实上,每次它接收到不同的上下文时,它都会相应地更新内部缓存以匹配该上下文。例如,如果您请求该服务器为您提供一些英文数据,那么它会将内部缓存重新加载为英文。如果下一个请求是法语的,那么它会再次重新加载缓存。显然,我想通过更智能地转发请求来最大程度地减少缓存重新加载的次数。
通信协议(protocol)是自制的(基于 TCP/IP),但从中提取上下文部分相对容易。
目前负载均衡是在客户端实现的,因此每个客户端都配置为知道所有服务器节点并以循环方式向它们发送请求。这种方法存在几个问题:客户端的连接管理复杂,与彼此不认识的多个客户端一起工作不正确,无法管理节点生命周期。我们无法通过重构解决列出的问题。
很可能我们最终会使用自制的转发解决方案,但我仍然想知道是否有至少用于流程管理的现有产品?理想情况下,这将是 Java 应用程序服务器,它可以:
也许这个功能已经在一些现有的应用服务器中实现了?这将大大简化问题!
最佳答案
关于流程管理,您可以通过混合 Apache Commons Exec 的功能轻松实现您的目标库可以帮助产生新的 worker 实例 Apache Commons Pool将管理正在运行的实例的库。
实现非常简单,因为公共(public)池将确保您一次可以使用一个对象,直到它返回到池中。如果对象没有返回到池中,公共(public)池将为您生成新实例。您可以通过添加看门狗服务(来自 apache commons exec)来控制工作人员的生命周期 - 看门狗可以杀死一段时间未使用的实例,或者您也可以使用公共(public)池本身,例如通过调用 pool.clearOldest()。您还可以通过调用 pool.getNumActive() 查看当前处理了多少请求(有多少工作人员处于 Activity 状态)。引用 GenericKeyedObjectPool 的 javadoc 以查看更多内容。
可以通过在 Tomcat 服务器上运行一个简单的 servlet 来完成实现。这个 servlet 将实例化池并通过调用 pool.borowObject(parameters) 简单地向池请求新的 worker。在参数内部,您可以定义您的工作人员应该具有哪些特征来处理请求(在您的情况下,参数应该包括语言)。如果没有这样的工作人员可用(例如没有法语工作人员)池将为您生成新的工作人员。此外,如果有一个工作人员但该工作人员当前正在处理另一个请求,则池也会为您生成一个新工作人员(因此您将有两个工作人员处理相同的语言)。当您调用 pool.returnObject(parameters, instance) 时,Worker 将准备好处理新请求。
整个实现只用了不到 200 行代码(完整代码见下文)。该代码包括工作进程从外部被杀死或崩溃的情况(请参阅 WorkersFactory.activateObject())。
恕我直言:使用 Apache Cammel 对您来说不是一个好的选择,因为它太大了,而且它被设计为不同消息格式之间的中介总线。你不需要做转换,你不需要处理不同格式的消息。寻求简单的解决方案。
package com.myapp;
import org.apache.commons.exec.CommandLine;
import org.apache.commons.exec.DefaultExecutor;
import org.apache.commons.exec.ExecuteWatchdog;
import org.apache.commons.pool2.BaseKeyedPooledObjectFactory;
import org.apache.commons.pool2.KeyedPooledObjectFactory;
import org.apache.commons.pool2.PooledObject;
import org.apache.commons.pool2.impl.DefaultPooledObject;
import org.apache.commons.pool2.impl.GenericKeyedObjectPool;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import java.io.IOException;
import java.util.Objects;
public class BalancingServlet extends javax.servlet.http.HttpServlet {
private final WorkersPool workersPool;
public BalancingServlet() {
workersPool = new WorkersPool(new WorkersFactory());
}
protected void doPost(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException {
}
protected void doGet(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException {
response.getWriter().println("Balancing");
String language = request.getParameter("language");
String someOtherParam = request.getParameter("other");
WorkerParameters workerParameters = new WorkerParameters(language, someOtherParam);
String requestSpecificParam1 = request.getParameter("requestParam1");
String requestSpecificParam2 = request.getParameter("requestParam2");
try {
WorkerInstance workerInstance = workersPool.borrowObject(workerParameters);
workerInstance.handleRequest(requestSpecificParam1, requestSpecificParam2);
workersPool.returnObject(workerParameters, workerInstance);
} catch (Exception e) {
e.printStackTrace();
}
}
}
class WorkerParameters {
private final String workerLangauge;
private final String someOtherParam;
WorkerParameters(String workerLangauge, String someOtherParam) {
this.workerLangauge = workerLangauge;
this.someOtherParam = someOtherParam;
}
public String getWorkerLangauge() {
return workerLangauge;
}
public String getSomeOtherParam() {
return someOtherParam;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
WorkerParameters that = (WorkerParameters) o;
return Objects.equals(this.workerLangauge, that.workerLangauge) && Objects.equals(this.someOtherParam, that.someOtherParam);
}
@Override
public int hashCode() {
return Objects.hash(workerLangauge, someOtherParam);
}
}
class WorkerInstance {
private final Thread thread;
private WorkerParameters workerParameters;
public WorkerInstance(final WorkerParameters workerParameters) {
this.workerParameters = workerParameters;
// launch the process here
System.out.println("Spawing worker for language: " + workerParameters.getWorkerLangauge());
// use commons Exec to spawn your process using command line here
// something like
thread = new Thread(new Runnable() {
@Override
public void run() {
try {
String line = "C:/Windows/notepad.exe" ;
final CommandLine cmdLine = CommandLine.parse(line);
final DefaultExecutor executor = new DefaultExecutor();
executor.setExitValue(0);
// ExecuteWatchdog watchdog = new ExecuteWatchdog(60000); // if you want to kill process running too long
// executor.setWatchdog(watchdog);
int exitValue = executor.execute(cmdLine);
System.out.println("process finished with exit code: " + exitValue);
} catch (IOException e) {
throw new RuntimeException("Problem while executing application for language: " + workerParameters.getWorkerLangauge(), e);
}
}
});
thread.start();
System.out.println("Process spawned for language: " + workerParameters.getWorkerLangauge());
}
public void handleRequest(String someRequestParam1, String someRequestParam2) {
System.out.println("Handling request for extra params: " + someRequestParam1 + ", " + someRequestParam2);
// communicate with your application using parameters here
// communcate via tcp or whatever protovol you want using extra parameters: someRequestParam1, someRequestParam2
}
public boolean isRunning() {
return thread.isAlive();
}
}
class WorkersFactory extends BaseKeyedPooledObjectFactory<WorkerParameters, WorkerInstance> {
@Override
public WorkerInstance create(WorkerParameters parameters) throws Exception {
return new WorkerInstance(parameters);
}
@Override
public PooledObject<WorkerInstance> wrap(WorkerInstance worker) {
return new DefaultPooledObject<WorkerInstance>(worker);
}
@Override
public void activateObject(WorkerParameters worker, PooledObject<WorkerInstance> p)
throws Exception {
System.out.println("Activating worker for lang: " + worker.getWorkerLangauge());
if (! p.getObject().isRunning()) {
System.out.println("Worker for lang: " + worker.getWorkerLangauge() + " stopped working, needs to respawn it");
throw new RuntimeException("Worker for lang: " + worker.getWorkerLangauge() + " stopped working, needs to respawn it");
}
}
@Override
public void passivateObject(WorkerParameters worker, PooledObject<WorkerInstance> p)
throws Exception {
System.out.println("Passivating worker for lang: " + worker.getWorkerLangauge());
}
}
class WorkersPool extends GenericKeyedObjectPool<WorkerParameters, WorkerInstance> {
public WorkersPool(KeyedPooledObjectFactory<WorkerParameters, WorkerInstance> factory) {
super(factory);
}
}
关于java - 进程池的应用程序级负载平衡器,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/25259873/
我是 Linux 的新手,并且继承了保持我们的单一 Linux 服务器运行的职责。这是我们的SVN服务器,所以比较重要。 原来在我之前维护它的人有一个 cron 任务,当有太多 svnserve 进程
Node 虽然自身存在多个线程,但是运行在 v8 上的 JavaScript 是单线程的。Node 的 child_process 模块用于创建子进程,我们可以通过子进程充分利用 CPU。范例:
Jenkins 有这么多进程处于事件状态是否正常? 我检查了我的设置,我只配置了 2 个“执行者”... htop http://d.pr/i/RZzG+ 最佳答案 您不仅要限制 Master 中的执
我正在尝试在 scala 中运行这样的 bash 命令: cat "example file.txt" | grep abc Scala 有一个特殊的流程管道语法,所以这是我的第一个方法: val f
很难说出这里要问什么。这个问题模棱两可、含糊不清、不完整、过于宽泛或夸夸其谈,无法以目前的形式得到合理的回答。如需帮助澄清此问题以便重新打开,visit the help center . 关闭 1
我需要一些帮助来理解并发编程的基础知识。事实上,我读得越多,就越感到困惑。因此,我理解进程是顺序执行的程序的一个实例,并且它可以由一个或多个线程组成。在单核CPU中,一次只能执行一个线程,而在多核CP
我的问题是在上一次集成测试后服务器进程没有关闭。 在integration.rs中,我有: lazy_static! { static ref SERVER: Arc> = {
我正在使用 Scala scala.sys.process图书馆。 我知道我可以用 ! 捕获退出代码和输出 !!但是如果我想同时捕获两者呢? 我看过这个答案 https://stackoverflow
我正在开发一个C++类(MyClass.cpp),将其编译为动态共享库(MyClass.so)。 同一台Linux计算机上运行的两个不同应用程序将使用此共享库。 它们是两个不同的应用程序。它不是多线程
我在我的 C 程序中使用 recvfrom() 从多个客户端接收 UDP 数据包,这些客户端可以使用自定义用户名登录。一旦他们登录,我希望他们的用户名与唯一的客户端进程配对,这样服务器就可以通过数据包
如何更改程序,以便函数 function_delayed_1 和 function_delayed_2 仅同时执行一次: int main(int argc, char *argv[]) {
考虑这两个程序: //in #define MAX 50 int main(int argc, char* argv[]) { int *count; int fd=shm
请告诉我如何一次打开三个终端,这样我的项目就可以轻松执行,而不必打开三个终端三次然后运行三个exe文件。请问我们如何通过脚本来做到这一点,即打开三个终端并执行三个 exe 文件。 最佳答案 在后台运行
我编写了一个监控服务来跟踪一组进程,并在服务行为异常、内存使用率高、超出 CPU 运行时间等时发出通知。 这在我的本地计算机上运行良好,但我需要它指向远程机器并获取这些机器上的进程信息。 我的方法,在
关闭。这个问题不符合Stack Overflow guidelines .它目前不接受答案。 想改进这个问题?将问题更新为 on-topic对于堆栈溢出。 8年前关闭。 Improve this qu
我有一个允许用户上传文件的应用程序。上传完成后,必须在服务器上完成许多处理步骤(解压、存储、验证等...),因此稍后会在一切完成后通过电子邮件通知用户。 我见过很多示例,其中 System.Compo
这个问题对很多人来说可能听起来很愚蠢,但我想对这个话题有一个清晰的理解。例如:当我们在 linux(ubuntu, x86) 上构建一个 C 程序时,它会在成功编译和链接过程后生成 a.out。 a.
ps -eaf | grep java 命令在这里不是识别进程是否是 java 进程的解决方案,因为执行此命令后我的许多 java 进程未在输出中列出。 最佳答案 简答(希望有人写一个更全面的): 获
我有几个与内核态和用户态的 Windows 进程相关的问题。 如果我有一个 hello world 应用程序和一个暴露新系统调用 foo() 的 hello world 驱动程序,我很好奇在内核模式下
我找不到很多关于 Windows 中不受信任的完整性级别的信息,对此有一些疑问: 是否有不受信任的完整性级别进程可以创建命名对象的地方? (互斥锁、事件等) 不受信任的完整性级别进程是否应该能够打开一
我是一名优秀的程序员,十分优秀!