- iOS/Objective-C 元类和类别
- objective-c - -1001 错误,当 NSURLSession 通过 httpproxy 和/etc/hosts
- java - 使用网络类获取 url 地址
- ios - 推送通知中不播放声音
我们有传统的 C++ 单体软件,其作用类似于请求-回复 TCP 服务器。该软件是单线程的,可以同时处理一个请求。目前,我们有固定的此类流程池来并行服务多个客户端。
由于消息量很大,客户端会定期遇到请求处理的严重延迟。目前我们有一个想法通过在客户和工作人员之间引入一种代理来解决这个问题:
我们希望此代理具有以下功能:
事实上,我们希望它的行为类似于 Java 中的 ExecutorService,但使用工作进程而不是线程。目前的想法是基于 Jetty 或 Tomcat 服务器在 Java 中实现这个平衡器,内部消息队列和 servlet 将请求转发给工作进程。
但我想知道:是否存在可以自动执行此过程的现有解决方案(最好是 Java)?实现此类代理的最简单方法是什么?
更新:
我对请求上下文所做的工作 - 好吧,那个 C++ 服务器确实是一个困惑的软件。事实上,每次它接收到不同的上下文时,它都会相应地更新内部缓存以匹配该上下文。例如,如果您请求该服务器为您提供一些英文数据,那么它会将内部缓存重新加载为英文。如果下一个请求是法语的,那么它会再次重新加载缓存。显然,我想通过更智能地转发请求来最大程度地减少缓存重新加载的次数。
通信协议(protocol)是自制的(基于 TCP/IP),但从中提取上下文部分相对容易。
目前负载均衡是在客户端实现的,因此每个客户端都配置为知道所有服务器节点并以循环方式向它们发送请求。这种方法存在几个问题:客户端的连接管理复杂,与彼此不认识的多个客户端一起工作不正确,无法管理节点生命周期。我们无法通过重构解决列出的问题。
很可能我们最终会使用自制的转发解决方案,但我仍然想知道是否有至少用于流程管理的现有产品?理想情况下,这将是 Java 应用程序服务器,它可以:
也许这个功能已经在一些现有的应用服务器中实现了?这将大大简化问题!
最佳答案
关于流程管理,您可以通过混合 Apache Commons Exec 的功能轻松实现您的目标库可以帮助产生新的 worker 实例 Apache Commons Pool将管理正在运行的实例的库。
实现非常简单,因为公共(public)池将确保您一次可以使用一个对象,直到它返回到池中。如果对象没有返回到池中,公共(public)池将为您生成新实例。您可以通过添加看门狗服务(来自 apache commons exec)来控制工作人员的生命周期 - 看门狗可以杀死一段时间未使用的实例,或者您也可以使用公共(public)池本身,例如通过调用 pool.clearOldest()。您还可以通过调用 pool.getNumActive() 查看当前处理了多少请求(有多少工作人员处于 Activity 状态)。引用 GenericKeyedObjectPool 的 javadoc 以查看更多内容。
可以通过在 Tomcat 服务器上运行一个简单的 servlet 来完成实现。这个 servlet 将实例化池并通过调用 pool.borowObject(parameters) 简单地向池请求新的 worker。在参数内部,您可以定义您的工作人员应该具有哪些特征来处理请求(在您的情况下,参数应该包括语言)。如果没有这样的工作人员可用(例如没有法语工作人员)池将为您生成新的工作人员。此外,如果有一个工作人员但该工作人员当前正在处理另一个请求,则池也会为您生成一个新工作人员(因此您将有两个工作人员处理相同的语言)。当您调用 pool.returnObject(parameters, instance) 时,Worker 将准备好处理新请求。
整个实现只用了不到 200 行代码(完整代码见下文)。该代码包括工作进程从外部被杀死或崩溃的情况(请参阅 WorkersFactory.activateObject())。
恕我直言:使用 Apache Cammel 对您来说不是一个好的选择,因为它太大了,而且它被设计为不同消息格式之间的中介总线。你不需要做转换,你不需要处理不同格式的消息。寻求简单的解决方案。
package com.myapp;
import org.apache.commons.exec.CommandLine;
import org.apache.commons.exec.DefaultExecutor;
import org.apache.commons.exec.ExecuteWatchdog;
import org.apache.commons.pool2.BaseKeyedPooledObjectFactory;
import org.apache.commons.pool2.KeyedPooledObjectFactory;
import org.apache.commons.pool2.PooledObject;
import org.apache.commons.pool2.impl.DefaultPooledObject;
import org.apache.commons.pool2.impl.GenericKeyedObjectPool;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import java.io.IOException;
import java.util.Objects;
public class BalancingServlet extends javax.servlet.http.HttpServlet {
private final WorkersPool workersPool;
public BalancingServlet() {
workersPool = new WorkersPool(new WorkersFactory());
}
protected void doPost(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException {
}
protected void doGet(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException {
response.getWriter().println("Balancing");
String language = request.getParameter("language");
String someOtherParam = request.getParameter("other");
WorkerParameters workerParameters = new WorkerParameters(language, someOtherParam);
String requestSpecificParam1 = request.getParameter("requestParam1");
String requestSpecificParam2 = request.getParameter("requestParam2");
try {
WorkerInstance workerInstance = workersPool.borrowObject(workerParameters);
workerInstance.handleRequest(requestSpecificParam1, requestSpecificParam2);
workersPool.returnObject(workerParameters, workerInstance);
} catch (Exception e) {
e.printStackTrace();
}
}
}
class WorkerParameters {
private final String workerLangauge;
private final String someOtherParam;
WorkerParameters(String workerLangauge, String someOtherParam) {
this.workerLangauge = workerLangauge;
this.someOtherParam = someOtherParam;
}
public String getWorkerLangauge() {
return workerLangauge;
}
public String getSomeOtherParam() {
return someOtherParam;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
WorkerParameters that = (WorkerParameters) o;
return Objects.equals(this.workerLangauge, that.workerLangauge) && Objects.equals(this.someOtherParam, that.someOtherParam);
}
@Override
public int hashCode() {
return Objects.hash(workerLangauge, someOtherParam);
}
}
class WorkerInstance {
private final Thread thread;
private WorkerParameters workerParameters;
public WorkerInstance(final WorkerParameters workerParameters) {
this.workerParameters = workerParameters;
// launch the process here
System.out.println("Spawing worker for language: " + workerParameters.getWorkerLangauge());
// use commons Exec to spawn your process using command line here
// something like
thread = new Thread(new Runnable() {
@Override
public void run() {
try {
String line = "C:/Windows/notepad.exe" ;
final CommandLine cmdLine = CommandLine.parse(line);
final DefaultExecutor executor = new DefaultExecutor();
executor.setExitValue(0);
// ExecuteWatchdog watchdog = new ExecuteWatchdog(60000); // if you want to kill process running too long
// executor.setWatchdog(watchdog);
int exitValue = executor.execute(cmdLine);
System.out.println("process finished with exit code: " + exitValue);
} catch (IOException e) {
throw new RuntimeException("Problem while executing application for language: " + workerParameters.getWorkerLangauge(), e);
}
}
});
thread.start();
System.out.println("Process spawned for language: " + workerParameters.getWorkerLangauge());
}
public void handleRequest(String someRequestParam1, String someRequestParam2) {
System.out.println("Handling request for extra params: " + someRequestParam1 + ", " + someRequestParam2);
// communicate with your application using parameters here
// communcate via tcp or whatever protovol you want using extra parameters: someRequestParam1, someRequestParam2
}
public boolean isRunning() {
return thread.isAlive();
}
}
class WorkersFactory extends BaseKeyedPooledObjectFactory<WorkerParameters, WorkerInstance> {
@Override
public WorkerInstance create(WorkerParameters parameters) throws Exception {
return new WorkerInstance(parameters);
}
@Override
public PooledObject<WorkerInstance> wrap(WorkerInstance worker) {
return new DefaultPooledObject<WorkerInstance>(worker);
}
@Override
public void activateObject(WorkerParameters worker, PooledObject<WorkerInstance> p)
throws Exception {
System.out.println("Activating worker for lang: " + worker.getWorkerLangauge());
if (! p.getObject().isRunning()) {
System.out.println("Worker for lang: " + worker.getWorkerLangauge() + " stopped working, needs to respawn it");
throw new RuntimeException("Worker for lang: " + worker.getWorkerLangauge() + " stopped working, needs to respawn it");
}
}
@Override
public void passivateObject(WorkerParameters worker, PooledObject<WorkerInstance> p)
throws Exception {
System.out.println("Passivating worker for lang: " + worker.getWorkerLangauge());
}
}
class WorkersPool extends GenericKeyedObjectPool<WorkerParameters, WorkerInstance> {
public WorkersPool(KeyedPooledObjectFactory<WorkerParameters, WorkerInstance> factory) {
super(factory);
}
}
关于java - 进程池的应用程序级负载平衡器,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/25259873/
最近,我们将专用 SQL 池部署到生产中的 Synapse 工作区。在开发中,我们可以访问无服务器 SQL 池和专用 SQL 池。但是,在生产中,我们可以访问无服务器 SQL 池,但无法访问专用 SQ
假设您从一个项目公开 WCF 服务,并使用“添加服务引用”(在本例中为 Framework 3.5 WPF 应用程序)在另一个项目中使用它。 当您重新实例化 ClientBase 派生代理时,Clie
我有一个函数,它使用 multiprocessing.Pool 并行处理一个数据集中的所有数据。 from multiprocessing import Pool ... def func():
我正在尝试使用进程对象在 python 中使用工作池。每个 worker (一个进程)进行一些初始化(花费大量时间),传递一系列作业(理想情况下使用 map()),并返回一些东西。除此之外,不需要任何
我是软件工程师,最近我构建了我的 Linux 机器,想探索更多系统管理员类型的任务。我已经探索并阅读了很多关于 ZFS 的内容,但我越来越困惑,因为每篇文章对它的描述都不一样。 Everything
我有 zfs 池: $ sudo zpool status lxd pool: lxd state: ONLINE scan: none requested config: NAME
我有一个基于 Actor 的项目,对于其中的一部分,我必须使用一些接收消息的 Actor ,然后一个 Actor 分别分配给每个请求,每个 Actor 负责执行其消息请求,所以我需要类似线程的东西我的
我已经使用 QEMU 模拟器成功地将 FreeBSD 安装到原始图像文件中。我已经使用 ZFS 文件系统 (ZFS POOL) 格式化了图像文件。 使用下面的命令我已经成功地挂载了准备好由 zpool
我正在使用 multiprocessor.Pool并行处理一些文件。该代码等待接收文件,然后使用 Pool.apply_async 将该文件发送给工作人员。 ,然后处理文件。 这段代码应该一直在运行,
我正在使用带有光滑的 Bonecp 数据源。并发现池包含关闭的连接所以我总是遇到这个异常 java.sql.SQLException: Connection is closed! at com
我有apartment gem的 Multi-Tenancy Rails应用程序,我可以使用apartment-sidekiq在每个工作程序中成功切换数据库租户。但是,sidekiq worker 正
ZFS 池可能由数据集(文件系统、快照等)或卷组成。 ZFS 卷就像 block 设备,但我不明白池和文件系统之间的区别。当我通过 zpool create pool1 sda sdb sdc 创建
我在 docker 容器上运行了 airflow。我正在使用 airflow 2.0.2 版。 我知道我实际上可以通过 UI 创建池。但我正在寻找一种通过 pools.json 文件在 docker
我在tomcat中有一个jdbc池,用于建立数据库连接。我在使用后没有显式关闭连接对象。我的“maxActive”参数设置为100。应用程序运行了一段时间,但随后失败进行数据库查询。它会等待无限时间来
阅读 PostgreSQL 文档 here我读了以下内容: As well, connections requested for users other than the default config
我在 docker 容器上运行了 airflow。我正在使用 airflow 2.0.2 版。 我知道我实际上可以通过 UI 创建池。但我正在寻找一种通过 pools.json 文件在 docker
我正在读取一个大的 URL 文件并向服务发出请求。该请求由返回 ListenableFuture 的客户端执行。现在我想保留一个 ListenableFuture 池,例如最多同时执行 N 个 Fut
我想使用队列来保存结果,因为我希望消费者(串行而不是并行)在工作人员产生结果时处理工作人员的结果。 现在,我想知道为什么以下程序挂起。 import multiprocessing as mp imp
我正在开发一个单页应用程序,目前正在构建一个 JQuery、ajax 函数,以便我的所有调用都能通过。 对于一个典型的页面,我可能有 3 个 ajax 调用。我的想法是,如果用户互联网出去将这些 aj
我有一个单位类及其一些子类(弓箭手、剑客等)。我怎样才能创建一个回收所有单元类型子类的池? 最佳答案 这是不可能的,因为池只能包含一种特定类型的对象。否则你可能会遇到这样的情况: Pool unitP
我是一名优秀的程序员,十分优秀!