- iOS/Objective-C 元类和类别
- objective-c - -1001 错误,当 NSURLSession 通过 httpproxy 和/etc/hosts
- java - 使用网络类获取 url 地址
- ios - 推送通知中不播放声音
我需要通过 Java 的标准输入将消息传递给 CLI PHP 进程。我想在一个池中运行大约 20 个 PHP 进程,这样当我将一条消息传递到池中时,它会将每条消息发送到一个单独的线程,从而保持要传递的消息队列。我希望这些 PHP 进程尽可能长时间地保持 Activity 状态,如果其中一个进程死亡,则会启动一个新进程。我看着用静态线程池来做这件事,但它似乎更适合执行并简单地死掉的任务。我怎么能用一个简单的界面将消息传递到池中呢?我是否必须实现自己的自定义“线程池”?
最佳答案
我正在为此提供一些代码,因为我认为它会让事情变得更清楚。基本上你需要保留一个进程对象池。请注意,这些流程中的每一个都有您需要以某种方式管理的输入、输出和错误流。在我的示例中,我只是将错误和输出重定向到主进程控制台。如果需要,您可以设置回调和处理程序以获取 PHP 程序的输出。如果您只是处理任务而不关心 PHP 的内容,那么请保持原样或重定向到文件。
我正在使用 Apache Commons Pool对象池的库。无需重新发明一个。
您将拥有一个由 20 个运行 PHP 程序的进程组成的池。仅此一项无法满足您的需求。您可能希望“同时”针对所有 20 个进程处理任务。因此,您还需要一个 ThreadPool 来从您的 ObjectPool 中提取一个进程。
您还需要了解,如果您终止或按 CTRL-C 键控制您的 Java 进程,init
进程将接管您的 php 进程,并且它们将坐在那里。您可能希望保留您生成的 PHP 进程的所有 pid 的日志,然后在您重新运行 Java 程序时清除它们。
public class StackOverflow_10037379 {
private static Logger sLogger = Logger.getLogger(StackOverflow_10037379.class.getName());
public static class CLIPoolableObjectFactory extends BasePoolableObjectFactory<Process> {
private String mProcessToRun;
public CLIPoolableObjectFactory(String processToRun) {
mProcessToRun = processToRun;
}
@Override
public Process makeObject() throws Exception {
ProcessBuilder builder = new ProcessBuilder();
builder.redirectError(Redirect.INHERIT);
// I am being lazy, but really the InputStream is where
// you can get any output of the PHP Process. This setting
// will make it output to the current processes console.
builder.redirectOutput(Redirect.INHERIT);
builder.redirectInput(Redirect.PIPE);
builder.command(mProcessToRun);
return builder.start();
}
@Override
public boolean validateObject(Process process) {
try {
process.exitValue();
return false;
} catch (IllegalThreadStateException ex) {
return true;
}
}
@Override
public void destroyObject(Process process) throws Exception {
// If PHP has a way to stop it, do that instead of destroy
process.destroy();
}
@Override
public void passivateObject(Process process) throws Exception {
// Should really try to read from the InputStream of the Process
// to prevent lock-ups if Rediret.INHERIT is not used.
}
}
public static class CLIWorkItem implements Runnable {
private ObjectPool<Process> mPool;
private String mWork;
public CLIWorkItem(ObjectPool<Process> pool, String work) {
mPool = pool;
mWork = work;
}
@Override
public void run() {
Process workProcess = null;
try {
workProcess = mPool.borrowObject();
OutputStream os = workProcess.getOutputStream();
os.write(mWork.getBytes(Charset.forName("UTF-8")));
os.flush();
// Because of the INHERIT rule with the output stream
// the console stream overwrites itself. REMOVE THIS in production.
Thread.sleep(100);
} catch (Exception ex) {
sLogger.log(Level.SEVERE, null, ex);
} finally {
if (workProcess != null) {
try {
// Seriously.. so many exceptions.
mPool.returnObject(workProcess);
} catch (Exception ex) {
sLogger.log(Level.SEVERE, null, ex);
}
}
}
}
}
public static void main(String[] args) throws Exception {
// Change the 5 to 20 in your case.
// Also change mock_php.exe to /usr/bin/php or wherever.
ObjectPool<Process> pool =
new GenericObjectPool<>(
new CLIPoolableObjectFactory("mock_php.exe"), 5);
// This will only allow you to queue 100 work items at a time. I would suspect
// that if you only want 20 PHP processes running at a time and this queue
// filled up you'll need to implement some other strategy as you are doing
// more work than PHP can keep up with. You'll need to block at some point
// or throw work away.
BlockingQueue<Runnable> queue =
new ArrayBlockingQueue<>(100, true);
ThreadPoolExecutor executor =
new ThreadPoolExecutor(20, 20, 1, TimeUnit.HOURS, queue);
// print some stuff out.
executor.execute(new CLIWorkItem(pool, "Message 1\r\n"));
executor.execute(new CLIWorkItem(pool, "Message 2\r\n"));
executor.execute(new CLIWorkItem(pool, "Message 3\r\n"));
executor.execute(new CLIWorkItem(pool, "Message 4\r\n"));
executor.execute(new CLIWorkItem(pool, "Message 5\r\n"));
executor.execute(new CLIWorkItem(pool, "Message 6\r\n"));
executor.execute(new CLIWorkItem(pool, "Message 7\r\n"));
executor.execute(new CLIWorkItem(pool, "Message 8\r\n"));
executor.execute(new CLIWorkItem(pool, "Message 9\r\n"));
executor.execute(new CLIWorkItem(pool, "Message 10\r\n"));
executor.execute(new CLIWorkItem(pool, "Message 11\r\n"));
executor.shutdown();
executor.awaitTermination(4000, TimeUnit.HOURS);
pool.close();
}
}
程序运行的输出:
12172 - Message 2
10568 - Message 1
4804 - Message 3
11916 - Message 4
11116 - Message 5
12172 - Message 6
4804 - Message 7
10568 - Message 8
11916 - Message 9
11116 - Message 10
12172 - Message 11
仅输出输入内容的 C++ 程序代码:
#include <windows.h>
#include <iostream>
#include <string>
int main(int argc, char* argv[])
{
DWORD pid = GetCurrentProcessId();
std::string line;
while (true) {
std::getline (std::cin, line);
std::cout << pid << " - " << line << std::endl;
}
return 0;
}
更新
抱歉耽搁了。这是供任何感兴趣的人使用的 JDK 6 版本。您必须运行一个单独的线程来从进程的 InputStream 读取所有输入。我已将此代码设置为在每个新进程旁边生成一个新线程。只要该线程还活着,它就会始终从该进程中读取数据。我没有直接输出到文件,而是将其设置为使用日志记录框架。通过这种方式,您可以设置日志记录配置以转到文件、翻转、转到控制台等,而无需硬编码以转到文件。
你会注意到我只为每个进程启动一个 Gobbler,即使一个进程有 stdout 和 stderr。我将 stderr 重定向到 stdout 只是为了让事情变得更容易。显然jdk6只支持这种类型的重定向。
public class StackOverflow_10037379_jdk6 {
private static Logger sLogger = Logger.getLogger(StackOverflow_10037379_jdk6.class.getName());
// Shamelessy taken from Google and modified.
// I don't know who the original Author is.
public static class StreamGobbler extends Thread {
InputStream is;
Logger logger;
Level level;
StreamGobbler(String logName, Level level, InputStream is) {
this.is = is;
this.logger = Logger.getLogger(logName);
this.level = level;
}
public void run() {
try {
InputStreamReader isr = new InputStreamReader(is);
BufferedReader br = new BufferedReader(isr);
String line = null;
while ((line = br.readLine()) != null) {
logger.log(level, line);
}
} catch (IOException ex) {
logger.log(Level.SEVERE, "Failed to read from Process.", ex);
}
logger.log(
Level.INFO,
String.format("Exiting Gobbler for %s.", logger.getName()));
}
}
public static class CLIPoolableObjectFactory extends BasePoolableObjectFactory<Process> {
private String mProcessToRun;
public CLIPoolableObjectFactory(String processToRun) {
mProcessToRun = processToRun;
}
@Override
public Process makeObject() throws Exception {
ProcessBuilder builder = new ProcessBuilder();
builder.redirectErrorStream(true);
builder.command(mProcessToRun);
Process process = builder.start();
StreamGobbler loggingGobbler =
new StreamGobbler(
String.format("process.%s", process.hashCode()),
Level.INFO,
process.getInputStream());
loggingGobbler.start();
return process;
}
@Override
public boolean validateObject(Process process) {
try {
process.exitValue();
return false;
} catch (IllegalThreadStateException ex) {
return true;
}
}
@Override
public void destroyObject(Process process) throws Exception {
// If PHP has a way to stop it, do that instead of destroy
process.destroy();
}
@Override
public void passivateObject(Process process) throws Exception {
// Should really try to read from the InputStream of the Process
// to prevent lock-ups if Rediret.INHERIT is not used.
}
}
public static class CLIWorkItem implements Runnable {
private ObjectPool<Process> mPool;
private String mWork;
public CLIWorkItem(ObjectPool<Process> pool, String work) {
mPool = pool;
mWork = work;
}
@Override
public void run() {
Process workProcess = null;
try {
workProcess = mPool.borrowObject();
OutputStream os = workProcess.getOutputStream();
os.write(mWork.getBytes(Charset.forName("UTF-8")));
os.flush();
// Because of the INHERIT rule with the output stream
// the console stream overwrites itself. REMOVE THIS in production.
Thread.sleep(100);
} catch (Exception ex) {
sLogger.log(Level.SEVERE, null, ex);
} finally {
if (workProcess != null) {
try {
// Seriously.. so many exceptions.
mPool.returnObject(workProcess);
} catch (Exception ex) {
sLogger.log(Level.SEVERE, null, ex);
}
}
}
}
}
public static void main(String[] args) throws Exception {
// Change the 5 to 20 in your case.
ObjectPool<Process> pool =
new GenericObjectPool<Process>(
new CLIPoolableObjectFactory("mock_php.exe"), 5);
BlockingQueue<Runnable> queue = new ArrayBlockingQueue<Runnable>(100, true);
ThreadPoolExecutor executor = new ThreadPoolExecutor(20, 20, 1, TimeUnit.HOURS, queue);
// print some stuff out.
executor.execute(new CLIWorkItem(pool, "Message 1\r\n"));
executor.execute(new CLIWorkItem(pool, "Message 2\r\n"));
executor.execute(new CLIWorkItem(pool, "Message 3\r\n"));
executor.execute(new CLIWorkItem(pool, "Message 4\r\n"));
executor.execute(new CLIWorkItem(pool, "Message 5\r\n"));
executor.execute(new CLIWorkItem(pool, "Message 6\r\n"));
executor.execute(new CLIWorkItem(pool, "Message 7\r\n"));
executor.execute(new CLIWorkItem(pool, "Message 8\r\n"));
executor.execute(new CLIWorkItem(pool, "Message 9\r\n"));
executor.execute(new CLIWorkItem(pool, "Message 10\r\n"));
executor.execute(new CLIWorkItem(pool, "Message 11\r\n"));
executor.shutdown();
executor.awaitTermination(4000, TimeUnit.HOURS);
pool.close();
}
}
输出
Apr 11, 2012 8:41:02 PM stackoverflow.StackOverflow_10037379_jdk6$StreamGobbler run
INFO: 9440 - Message 3
Apr 11, 2012 8:41:02 PM stackoverflow.StackOverflow_10037379_jdk6$StreamGobbler run
INFO: 8776 - Message 2
Apr 11, 2012 8:41:02 PM stackoverflow.StackOverflow_10037379_jdk6$StreamGobbler run
INFO: 6100 - Message 1
Apr 11, 2012 8:41:02 PM stackoverflow.StackOverflow_10037379_jdk6$StreamGobbler run
INFO: 10096 - Message 4
Apr 11, 2012 8:41:02 PM stackoverflow.StackOverflow_10037379_jdk6$StreamGobbler run
INFO: 8868 - Message 5
Apr 11, 2012 8:41:02 PM stackoverflow.StackOverflow_10037379_jdk6$StreamGobbler run
INFO: 8868 - Message 8
Apr 11, 2012 8:41:02 PM stackoverflow.StackOverflow_10037379_jdk6$StreamGobbler run
INFO: 6100 - Message 10
Apr 11, 2012 8:41:02 PM stackoverflow.StackOverflow_10037379_jdk6$StreamGobbler run
INFO: 8776 - Message 9
Apr 11, 2012 8:41:02 PM stackoverflow.StackOverflow_10037379_jdk6$StreamGobbler run
INFO: 10096 - Message 6
Apr 11, 2012 8:41:02 PM stackoverflow.StackOverflow_10037379_jdk6$StreamGobbler run
INFO: 9440 - Message 7
Apr 11, 2012 8:41:02 PM stackoverflow.StackOverflow_10037379_jdk6$StreamGobbler run
INFO: 6100 - Message 11
Apr 11, 2012 8:41:02 PM stackoverflow.StackOverflow_10037379_jdk6$StreamGobbler run
INFO: Exiting Gobbler for process.295131993.
Apr 11, 2012 8:41:02 PM stackoverflow.StackOverflow_10037379_jdk6$StreamGobbler run
INFO: Exiting Gobbler for process.756434719.
Apr 11, 2012 8:41:02 PM stackoverflow.StackOverflow_10037379_jdk6$StreamGobbler run
INFO: Exiting Gobbler for process.332711452.
Apr 11, 2012 8:41:02 PM stackoverflow.StackOverflow_10037379_jdk6$StreamGobbler run
INFO: Exiting Gobbler for process.1981440623.
Apr 11, 2012 8:41:02 PM stackoverflow.StackOverflow_10037379_jdk6$StreamGobbler run
INFO: Exiting Gobbler for process.1043636732.
关于java - CLI 进程的线程池,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/10037379/
我在全局范围内安装了最新的 angular-cli,并且我的项目正在成功构建。 在阅读另一个问题的建议解决方案时,(https://github.com/angular/angular-cli/iss
根据official docs关于 .angular-cli.json 配置文件,仅支持 main、test、polyfills 作为 webpack 入口文件。 如何通过定义 .angular-cl
我想在s3存储桶中搜索文件名abc.zip,并且有将近60个存储桶,每个存储桶都有2到3个级别的子目录或文件夹。我尝试使用AWS CLI命令执行搜索,以下是我尝试过的命令,但即使文件存在于存储桶中,也
我正在尝试在 C++/CLI 中查找赋值运算符和复制构造函数的示例。我在 Google 上花了很多时间,但令人惊讶的是我找不到一个看起来很常见的例子。 最佳答案 .NET 语义没有复制构造函数或赋值运
我需要获取另一个窗口的句柄。 这是它的代码: private: System::Void btn_find_Click(System::Object^ sender, System::EventAr
有点好奇实际上有多少人使用 C++/CLI?那么有多少人在使用它呢?人们是否只在托管环境下寻找程序员的特定工作?围绕 c++/cli 的社区有多大?从我目前看到的情况来看,有不少语法变化,作为 C++
我正在阅读一些 C++/CLI Material ,并且遇到了文字字段的概念: literal int inchesPerFoot = 12; 这是否比 const 更可取,因为 const FIEL
通过 npm install -g angular-cli@latest 安装 Angular CLI (angular-cli@1.0.0-beta.16) 失败,出现以下错误 npm-debug.
我正在尝试在我的 Ember CLI 应用程序中配置适配器以根据环境使用不同的主机。在开发中,我希望它是默认的当前主机(让我通过 --proxy 选项自定义它,但在生产中我知道它将是 http://s
我最近开始使用 Angular/CLI 工具,我在执行文件时遇到了一个问题,那就是我运行时 ng serve 那么这个命令可以帮助我们在源文件中进行任何更改时自动重新加载站点,但在我的系统中它没有发生
是否有推荐的方法通过 c++ cli 包装 native c++ 库? 最佳答案 不确定是否一种尺寸适合所有人,但是,是的,这很大程度上是一个机械过程。您的 ref 类包装器应该声明一个私有(priv
我有一个关于为要在 C# 中使用的 native C++ 类创建 C++ CLI 包装器的问题。 这是一个示例代码: #include "stdafx.h" #pragma once using na
下面的代码打印 0 和 3。这是代码生成错误吗?我在 .NET 4.0 下使用 Visual Studio 2012 Update 3 RC 运行它 #include "stdafx.h" using
它是如何工作的?它是否有不同的部分 - 有些方法是托管的,有些是非托管的,它是否将每个方法都转换为托管的,试图保持所有东西都处于托管状态并在必须时进行互操作调用? 最佳答案 三种不同的compiler
如果我没记错的话,函数必须是 CLR 世界中类的成员,而全局函数在 C++/CLI 中是可能的。这是否意味着这些全局函数是某种隐藏的“全局”类的一部分?如果是这样,出于反射目的,人们将如何获取其类型?
如何在 Angular 中重建我的项目。我首先使用 ng build 构建它,但无法再次执行该命令,因为它不会让我更改文件夹。 我收到此错误消息: EPERM:不允许操作,lstat 是否有另一个命令
我遇到了两个相互引用的类的问题。我曾尝试使用接口(interface)来解决问题,但遇到了其他问题,例如类重新定义。我只是不确定如何正确执行此操作。 这是我正在做的事情的一个例子。注意:我已经去掉了所
我是 React.js 的新手,我正在尝试从 tutorialspoint 上的教程中学习但我遇到了错误。这是我执行 npm start 命令时控制台上的错误: C:\Users\HP\Desktop
我正在尝试将我的 angular cli 和 angular core 从 12 升级到 13,但看起来存在对等依赖性问题。有人遇到过这个问题吗? npx @angular/cli@13 update
我正在我的 ubuntu 上安装 influxdb_2.0.9,我按照这里的说明操作: https://docs.influxdata.com/influxdb/v2.0/install/?t=Lin
我是一名优秀的程序员,十分优秀!