gpt4 book ai didi

Java:合并 InputStreams

转载 作者:搜寻专家 更新时间:2023-11-01 02:54:43 24 4
gpt4 key购买 nike

我的目标是创建(或使用现有的)一个 InputStream 实现(例如,MergeInputStream),它将尝试从多个 InputStream 中读取并返回第一个结果。之后它将释放锁定并停止从所有 InputStreams 读取,直到下一次 mergeInputStream.read() 调用。我很惊讶我没有找到任何这样的工具。问题是:所有源 InputStreams 都不是很有限(例如,不是文件,而是 System.in、套接字等),所以我不能使用 SequenceInputReader。我知道这可能需要一些多线程机制,但我完全不知道该怎么做。我尝试用谷歌搜索但没有结果。

最佳答案

从多个来源读取输入并将它们序列化为一个流的问题最好使用 SelectableChannel 来解决。和 Selector 。然而,这要求所有来源都能够提供可选择的 channel 。这可能是也可能不是这种情况。

如果可选 channel 不可用,您可以选择用单线程解决,方法是让读取实现执行以下操作:对于每个输入流 is , 检查是否 is.available() > 0 ,如果是,则返回 is.read() .重复此过程,直到某些输入流有可用数据。

但是,这种方法有两个主要缺点:

  1. Not all implementations of InputStream工具 available()当且仅当 read() 时返回 0会阻塞。结果自然是无法从此流中读取数据,即使 is.read() 也是如此。会返回一个值。这是否被视为错误是值得怀疑的,因为文档仅声明它应该返回可用字节数的“估计值”。

  2. 它使用所谓的“忙循环”,这基本上意味着您要么需要在循环中 hibernate (这会导致读取延迟),要么不必要地占用 CPU。

您的第三个选择是通过为每个输入流生成一个线程 来处理阻塞读取。然而,这将需要仔细同步,如果您有大量输入流要读取,则可能需要一些开销。下面的代码是第一次尝试解决它。我不确定它是否充分同步,或者它是否以最佳方式管理线程。

import java.io.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;

public class MergedInputStream extends InputStream {

AtomicInteger openStreamCount;
BlockingQueue<Integer> buf = new ArrayBlockingQueue<Integer>(1);
InputStream[] sources;

public MergedInputStream(InputStream... sources) {
this.sources = sources;
openStreamCount = new AtomicInteger(sources.length);
for (int i = 0; i < sources.length; i++)
new ReadThread(i).start();
}


public void close() throws IOException {
String ex = "";
for (InputStream is : sources) {
try {
is.close();
} catch (IOException e) {
ex += e.getMessage() + " ";
}
}
if (ex.length() > 0)
throw new IOException(ex.substring(0, ex.length() - 1));
}


public int read() throws IOException {
if (openStreamCount.get() == 0)
return -1;

try {
return buf.take();
} catch (InterruptedException e) {
throw new IOException(e);
}
}


private class ReadThread extends Thread {

private final int src;
public ReadThread(int src) {
this.src = src;
}

public void run() {
try {
int data;
while ((data = sources[src].read()) != -1)
buf.put(data);
} catch (IOException ioex) {
} catch (InterruptedException e) {
}
openStreamCount.decrementAndGet();
}
}
}

关于Java:合并 InputStreams,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/3931199/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com