gpt4 book ai didi

java - 将处理后的消息发送到特定线程

转载 作者:行者123 更新时间:2023-12-03 12:57:09 26 4
gpt4 key购买 nike

我有一组线程,其中每个线程都必须等待其所需的输入,进行一些计算,最后将其输出值发送到特定线程。

我计划拥有包含线程名称和线程本身的全局映射,以便让每个线程按名称获取其“后继”线程,然后将值发送给它们。

首先,我查看了使用阻塞队列的生产者-消费者示例:

class Consumer implements Runnable {
private final BlockingQueue queue;

Consumer(BlockingQueue q) {
queue = q;
}

public void run() {
try {
while(true) {
System.out.println("Waiting for input");
consume(queue.take());
}
} catch (InterruptedException ex) {
ex.printStackTrace();
}
}

void consume(Object x) {
System.out.println("Received: " + x);
}
}

class Setup {
public static void main(String...args) {
BlockingQueue q = new ArrayBlockingQueue<String>(10);
Producer p = new Producer(q);
Consumer c1 = new Consumer(q);
Consumer c2 = new Consumer(q);
new Thread(p).start();
new Thread(c1).start();
new Thread(c2).start();
}
}

我以为我可以为每个线程设置一个阻塞队列。然后,消费者线程将在 queue.take() 上循环,直到它接收到所有所需的值。

后来发现这个 post ,其中提出了与我类似的问题。提出的解决方案似乎比阻塞队列解决方案更容易:它基于仅在我想要的线程上调用一个方法,因此将消息发送到。

我想请教你一些建议(因为我认为这是一种常见的情况)关于两种方法中哪一种是最好的,或者是否有更好的方法来实现我想要的。

非常感谢您的帮助。

最佳答案

消费者 - 生产者很好。 (你在引用文献中提到的“答案”是一堆蠕虫......仔细想想......)

您可以使用 Queue , Pipe ,甚至 PipedInputStreamPipedOutputStream .还有Exchanger .

这是来自 Exchanger javadoc 的示例的 mod。不要担心嵌套的类,它只是一种紧凑的样式——与主题完全无关。

这里我们有一个“管道”类。它有 2 个线程(名称中的 R/L 指的是左、右)。管道流量为 R->L。

/* 
* mostly based on
* http://download.oracle.com/javase/6/docs/api/java/util/concurrent/Exchanger.html
*/
package so_6936111;

import java.util.concurrent.Exchanger;

public class WorkflowDemo {

public static void main(String[] args) {
Pipeline pipeline = new Pipeline();
pipeline.start();
}
// ----------------------------------------------------------------
// Pipeline
// ----------------------------------------------------------------

public static class Pipeline {

/** exchanger for messages */
Exchanger<Message> exchanger = new Exchanger<Message>();

/* the two message instances that are passed back and forth */
Message msg_1 = new Message();
Message msg_2 = new Message();

/** startups the pipeline */
void start() {
new Thread(new WorkerR()).start();
new Thread(new WorkerL()).start();
}


/** Message objects are passed between workflow threads */
public static class Message {
private Object content;
public Object getContent() { return content; }
public void setContent(Object c) { this.content = c; }
}


/** WorkerR is at the head of the pipeline */
class WorkerR implements Runnable {
public void run() {
Message message = msg_1;
try {
while (true) {
Object data = doSomeWork();
message.setContent(data);
message = exchanger.exchange(message);
}
} catch (InterruptedException ex) { ex.printStackTrace();}
}
/**
* let's pretend this is where you get your
* initial data and do some work
*/
private Object doSomeWork() {
return String.format("STEP-1@t:%d", System.nanoTime());
}
}

/** WorkerL is at the tail of the pipeline */
class WorkerL implements Runnable {
public void run() {
Message message = msg_2;
try {
while (true) {
message = exchanger.exchange(message);
Object data = doPostProcessing(message.getContent());
System.out.format("%s\n", data);
}
} catch (InterruptedException ex) { ex.printStackTrace();}
}

/**
* Let's pretend this is where the 2nd step of the workflow.
*/
private Object doPostProcessing(Object data) {
return String.format("%s | STEP-2@t:%d", data, System.nanoTime());
}
}
}
}

输出:
STEP-1@t:1312434325594730000 | STEP-2@t:1312434325594747000
STEP-1@t:1312434325594750000 | STEP-2@t:1312434325594765000
STEP-1@t:1312434325594768000 | STEP-2@t:1312434325594784000
STEP-1@t:1312434325594787000 | STEP-2@t:1312434325594804000
STEP-1@t:1312434325594806000 | STEP-2@t:1312434325594823000
STEP-1@t:1312434325594826000 | STEP-2@t:1312434325594841000
...

关于java - 将处理后的消息发送到特定线程,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/6936111/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com