- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
我有一个具有单读者线程模型的多写入线程。ThreadMultipleDateReceiver
类设计用于从多个线程读取。
public class ThreadMultipleDateReceiver extends Thread {
private static final int MAX_CLIENT_THREADS = 4;
private byte[] incomingBytes;
private volatile boolean isRunning;
private volatile List<ThreadStreamDateWriter> lThrdDate;
private static PipedInputStream pipedInputStream;
public ThreadMultipleDateReceiver() {
lThrdDate = Collections.synchronizedList(new ArrayList<>(MAX_CLIENT_THREADS));
pipedInputStream = new PipedInputStream();
System.out.println("ThreadMultipleDateReceiver Created");
}
@Override public void run() {
isRunning = true;
while (isRunning) {
if (!lThrdDate.isEmpty()) {
System.out.println("ThreadMultipleDateReceiver has:" + lThrdDate.size());
for (int i = lThrdDate.size(); i > 0; i--) {
if (lThrdDate.get(i - 1).getState() == Thread.State.TERMINATED) {
lThrdDate.remove(i - 1);
} else {
System.out.println("I ThreadMultipleDateReceiver have:" + lThrdDate.get(i - 1).getNameDateWriter());
}
}
incomingBytes = new byte[1024];
try {
String str = "";
int iRd;
System.out.println("ThreadMultipleDateReceiver waiting:" + str);
while ((iRd = pipedInputStream.read(incomingBytes)) != -1) {
if (iRd > 0) {
str += new String(incomingBytes);
}
}
System.out.println("ThreadMultipleDateReceiver Received:\n\t:" + str);
} catch (IOException e) { }
} else {
System.out.println("ThreadMultipleDateReceiver Empty");
}
}
emptyDateWriters();
}
public void addDateWriter(ThreadStreamDateWriter threadDateWriter) {
if (lThrdDate.size() < MAX_CLIENT_THREADS) {
lThrdDate.add(threadDateWriter);
}
}
private void emptyDateWriters() {
if (!lThrdDate.isEmpty()) {
for (int i = lThrdDate.size(); i > 0; i--) {
ThreadStreamDateWriter threadDateWriter = lThrdDate.get(i - 1);
threadDateWriter.stopThread();
lThrdDate.remove(i - 1);
}
}
}
public PipedInputStream getPipedInputStream() {
return pipedInputStream;
}
public void stopThread() {
isRunning = false;
}
}
以及单个 Writer 线程
public class ThreadStreamDateWriter extends Thread {
String Self;
private byte[] outgoingBytes;
private volatile boolean isRunning;
private static PipedOutputStream pipedOutputStream;
ThreadStreamDateWriter(String name, PipedInputStream snk) {
Self = name;
pipedOutputStream = new PipedOutputStream();
try {
pipedOutputStream.connect(snk);
} catch (IOException e) { }
}
@Override public void run() {
isRunning = true;
while (isRunning) {
try {
outgoingBytes = getInfo().getBytes();
System.out.println("ThreadStreamDateWriter -> write to pipedOutputStream:" + new String(outgoingBytes));
pipedOutputStream.write(outgoingBytes);
System.out.println("ThreadStreamDateWriter -> wrote:" + new String(outgoingBytes));
try { Thread.sleep(4000); } catch (InterruptedException ex) { }
} catch (IOException | NegativeArraySizeException | IndexOutOfBoundsException e) {
isRunning = false;
}
}
}
String getInfo() {
String sDtTm = new SimpleDateFormat("yyyyMMdd-hhmmss").format(Calendar.getInstance().getTime());
return Self + " -> " + sDtTm;
}
public void stopThread() {
isRunning = false;
}
public String getNameDateWriter() {
return Self;
}
}
如何启动(我使用的是 Netbeans)?
ThreadMultipleDateReceiver thrdMDateReceiver = null;
ThreadStreamDateWriter thrdSDateWriter0 = null;
ThreadStreamDateWriter thrdSDateWriter1 = null;
private void jtbDateExchangerActionPerformed(java.awt.event.ActionEvent evt) {
if (jtbDateExchanger.isSelected()) {
if (thrdMDateReceiver == null) {
thrdMDateReceiver = new ThreadMultipleDateReceiver();
thrdMDateReceiver.start();
}
if (thrdSDateWriter0 == null) {
thrdSDateWriter0 = new ThreadStreamDateWriter("-0-", thrdMDateReceiver.getPipedInputStream());
thrdSDateWriter0.start();
thrdMDateReceiver.addDateWriter(thrdSDateWriter0);
}
if (thrdSDateWriter1 == null) {
thrdSDateWriter1 = new ThreadStreamDateWriter("-1-", thrdMDateReceiver.getPipedInputStream());
thrdSDateWriter1.start();
thrdMDateReceiver.addDateWriter(thrdSDateWriter1);
}
} else {
if (thrdMDateReceiver != null) {
thrdMDateReceiver.stopThread();
}
}
}
输出
run:
ThreadMultipleDateReceiver Created
ThreadMultipleDateReceiver Empty
ThreadMultipleDateReceiver Empty
ThreadMultipleDateReceiver Empty
.....
ThreadMultipleDateReceiver Empty
ThreadMultipleDateReceiver Empty
ThreadMultipleDateReceiver Empty
ThreadMultipleDateReceiver has:1
I ThreadMultipleDateReceiver have:-0-
ThreadMultipleDateReceiver waiting:
ThreadStreamDateWriter -> write to pipedOutputStream:-0- -> 20170608-090003
ThreadStreamDateWriter -> write to pipedOutputStream:-1- -> 20170608-090003
BUILD SUCCESSFUL (total time: 1 minute 3 seconds)
ThreadMultipleDateReceiver 被阻止,并且不打印:
ThreadMultipleDateReceiver Received:
-1- -> 20170608-090003
或
ThreadMultipleDateReceiver Received:
-0- -> 20170608-090003
如何解决?
最佳答案
看起来您的管道输出流是静态的,因此每次构造 ThreadStreamDateWriter 时,您都会使用管道输出流的旧值。
尝试将其设为实例变量并将其传递到构造函数中。所以你只有其中之一。
编辑1:我创建了管道实例变量并添加了一些打印输出。现在似乎运行时间更长(见下文):
编辑2:你第二个pipedOutputStream.connect(snk);正在 throw 。您一次只能连接一件事。
import java.io.IOException;
import java.io.PipedInputStream;
import java.io.PipedOutputStream;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Calendar;
import java.util.Collections;
import java.util.List;
public class So44438086 {
public static class ThreadMultipleDateReceiver extends Thread {
private static final int MAX_CLIENT_THREADS=4;
private byte[] incomingBytes;
private volatile boolean isRunning;
private volatile List<ThreadStreamDateWriter> lThrdDate;
private /*static*/ PipedInputStream pipedInputStream;
public ThreadMultipleDateReceiver() {
lThrdDate=Collections.synchronizedList(new ArrayList<>(MAX_CLIENT_THREADS));
pipedInputStream=new PipedInputStream();
System.out.println("ctor setting pipedInputStream to: "+pipedInputStream);
System.out.println("ThreadMultipleDateReceiver Created");
}
@Override public void run() {
isRunning=true;
while(isRunning) {
if(!lThrdDate.isEmpty()) {
System.out.println("ThreadMultipleDateReceiver has:"+lThrdDate.size());
for(int i=lThrdDate.size();i>0;i--) {
if(lThrdDate.get(i-1).getState()==Thread.State.TERMINATED) {
lThrdDate.remove(i-1);
} else {
System.out.println("I ThreadMultipleDateReceiver have:"+lThrdDate.get(i-1).getNameDateWriter());
}
}
incomingBytes=new byte[1024];
try {
String str="";
int iRd;
System.out.println("ThreadMultipleDateReceiver waiting:"+str);
System.out.println("reading: "+pipedInputStream);
while((iRd=pipedInputStream.read(incomingBytes))!=-1) {
if(iRd>0) {
str+=new String(incomingBytes);
}
}
System.out.println("ThreadMultipleDateReceiver Received:\n\t:"+str);
} catch(IOException e) {}
} else {
System.out.println("ThreadMultipleDateReceiver Empty");
}
}
emptyDateWriters();
}
public void addDateWriter(ThreadStreamDateWriter threadDateWriter) {
if(lThrdDate.size()<MAX_CLIENT_THREADS) {
lThrdDate.add(threadDateWriter);
}
}
private void emptyDateWriters() {
if(!lThrdDate.isEmpty()) {
for(int i=lThrdDate.size();i>0;i--) {
ThreadStreamDateWriter threadDateWriter=lThrdDate.get(i-1);
threadDateWriter.stopThread();
lThrdDate.remove(i-1);
}
}
}
public PipedInputStream getPipedInputStream() {
return pipedInputStream;
}
public void stopThread() {
isRunning=false;
}
}
public static class ThreadStreamDateWriter extends Thread {
String Self;
private byte[] outgoingBytes;
private volatile boolean isRunning;
private /*static*/ PipedOutputStream pipedOutputStream;
ThreadStreamDateWriter(String name,PipedInputStream snk) {
Self=name;
pipedOutputStream=new PipedOutputStream();
System.out.println("ctor setting pipedOutputStream to: "+pipedOutputStream);
try {
pipedOutputStream.connect(snk);
System.out.println(pipedOutputStream+" connectd to: "+snk);
} catch(IOException e) {}
}
@Override public void run() {
isRunning=true;
while(isRunning) {
try {
outgoingBytes=getInfo().getBytes();
System.out.println("ThreadStreamDateWriter -> write to pipedOutputStream:"+new String(outgoingBytes));
System.out.println("writing to: "+pipedOutputStream);
pipedOutputStream.write(outgoingBytes);
System.out.println("ThreadStreamDateWriter -> wrote:"+new String(outgoingBytes));
try {
Thread.sleep(4000);
} catch(InterruptedException ex) {}
} catch(IOException|NegativeArraySizeException|IndexOutOfBoundsException e) {
isRunning=false;
}
}
}
String getInfo() {
String sDtTm=new SimpleDateFormat("yyyyMMdd-hhmmss").format(Calendar.getInstance().getTime());
return Self+" -> "+sDtTm;
}
public void stopThread() {
isRunning=false;
}
public String getNameDateWriter() {
return Self;
}
}
private void foo() {
if(thrdMDateReceiver==null) {
thrdMDateReceiver=new ThreadMultipleDateReceiver();
thrdMDateReceiver.start();
}
if(thrdSDateWriter0==null) {
thrdSDateWriter0=new ThreadStreamDateWriter("-0-",thrdMDateReceiver.getPipedInputStream());
thrdSDateWriter0.start();
thrdMDateReceiver.addDateWriter(thrdSDateWriter0);
}
if(thrdSDateWriter1==null) {
thrdSDateWriter1=new ThreadStreamDateWriter("-1-",thrdMDateReceiver.getPipedInputStream());
thrdSDateWriter1.start();
thrdMDateReceiver.addDateWriter(thrdSDateWriter1);
}
}
void run() throws InterruptedException {
System.out.println(("running"));
foo();
System.out.println(("sleeping"));
Thread.sleep(10000);
System.out.println(("stopping"));
if(thrdMDateReceiver!=null) {
thrdMDateReceiver.stopThread();
}
}
public static void main(String[] args) throws InterruptedException {
new So44438086().run();
}
ThreadMultipleDateReceiver thrdMDateReceiver=null;
ThreadStreamDateWriter thrdSDateWriter0=null;
ThreadStreamDateWriter thrdSDateWriter1=null;
}
关于java - 使用 PipedOutputStream 和 PipedInputStream 的多线程写入器和单线程读取器,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/44438086/
我正在使用java管道将数据(outstream)从解压模块(JavaUncompress类)传递到解析模块(处理类),文件很大,我想先解压文件并直接解析而不是保存解压缩的文件,然后解析。但是,它仅适
我可以为多线程使用一个 pipedoutputstream 并将其连接到一个 pipedinputstream,然后从多线程获取所有输出吗? 以下是代码片段,我想要的是修改和删除线程的输出可以用于同步
我正在使用管道输出流将 OutputStream 转换为 InputStream 因为 AWS java sdk 不允许使用 OutputStreams 我正在使用下面的代码,但是,这会间歇性地挂起。
我的目标是: 从 S3 读取文件, 更改其元数据 再次推送到S3 AWS java SDK 不允许推送输出流。因此,我必须将 outputstream 从 step2 转换为 inputstream。
我正在学习 java PipedInputStream/PipeOutputStream 。 我想读取标准输入(下面的“Source”类)并将其重定向到一个进程(此处为“grep A”),Grep 的
我正在尝试使用 PipedInputStream 和 PipedOutputStream 实现一个线程循环缓冲区,但每次当我进入 Decoder runnable 中的 mHead.write 时它都
我在 SO 上看到了两个答案,它们声称 Java 提供的 PipedInputStream 和 PipedOutputStream 类存在缺陷。但他们没有详细说明他们出了什么问题。他们真的有缺陷吗?如
当我尝试将 OutputStream 复制到 InputStream 时,我偶然发现了 PipedOutputStream 的一个非常奇怪的行为。另请参阅How to write an nt:file
我在 Java 中使用 PipedOutputStream 和 PipedInputStream。 一个线程正在生成字节并写入它们;另一个正在消耗它们。 我想确保生成字节的速度不会明显快于消耗字节的速
什么是管道流的用例?为什么不将数据读入缓冲区然后将它们写出呢? 最佳答案 BlockingQueue 或类似的集合可能会更好地为您服务,它们是线程安全的、健壮的并且扩展性更好。 关于java - Pi
我正在编写一个简单的 Swing GUI,其中包含一个用于打印调试消息和异常的文本字段。我目前在写入 PipedOutputStream 的地方设置了它,并且我有一个守护线程,它从连接的 PipedI
我有一个数据生成器,它在单独的线程中运行并将生成的数据推送到连接到 PipedInputStream 的 PipedOutputStream 中。此输入流的引用通过公共(public) API 公开,
PipedOutputStream的Android实现 write(byte[] buffer, int offset, int count) 是根据write(byte oneByte)实现的。更具
我有一个具有单读者线程模型的多写入线程。ThreadMultipleDateReceiver 类设计用于从多个线程读取。 public class ThreadMultipleDateReceiver
我在 Scala 中有以下代码: val pos = new PipedOutputStream() val pis = new PipedInputStream(pos)
我已经使用 JTextArea 开发了一个小型控制台。我阅读了一些教程并了解了一些东西。但我仍然有问题。这是我的代码。 public class Console extends JFrame {
如何正确完成管道输出端的工作?我需要写入线程终止或做一些其他工作,而读取线程读取所有写入数据直到结束。 我应该在写入端关闭管道还是什么? 更新 1 我想澄清一下......根据给定的答案,我认为设计管
我想使用 PipedOutputStream 和 PipedInputStream Java 类编写类似于Producer Consumer Problem 的示例。 注意:这是应用其原理的小示例。
我最近发现了这个成语,我想知道我是否遗漏了什么。我从未见过它使用过。我在野外使用过的几乎所有 Java 代码都倾向于将数据放入字符串或缓冲区中,而不是像这个示例(例如使用 HttpClient 和 X
我正在向 PipedOutputStream 写入一些数据,该数据已连接到 PipedInputStream,并将其传递给某些第三方 API。写入 PipedOutputStream 是在另一个线程上
我是一名优秀的程序员,十分优秀!