gpt4 book ai didi

java - 缓冲后台 InputStream 实现

转载 作者:塔克拉玛干 更新时间:2023-11-03 03:55:13 25 4
gpt4 key购买 nike

我已经编写了后台 InputStream(和 OutputStream)实现来包装其他流,并在后台线程上提前读取,主要允许解压缩/压缩发生在不同线程处理解压流。

这是一个相当标准的生产者/消费者模型。

这似乎是一种通过读取、处理和写入数据的简单进程充分利用多核 CPU 的简单方法,从而可以更有效地利用 CPU 和磁盘资源。也许“高效”不是最好的词,但与直接从 ZipInputStream 读取并直接写入 ZipOutputStream 相比,它提供了更高的利用率,而且我更感兴趣的是减少了运行时间

我很高兴发布代码,但我的问题是我是否正在重新发明现有(和更频繁使用的)库中现成的东西?

编辑 - 发布代码...

我的 BackgroundInputStream 代码如下(BackgroundOutputStream 非常相似),但我想改进它的某些方面。

  1. 看来我工作太辛苦了,无法来回传递缓冲区。
  2. 如果调用代码丢弃了对 BackgroundInputStream 的引用,backgroundReaderThread 将永远挂起。
  3. eof 信号需要改进。
  4. 应将异常传播到前台线程。
  5. 我想允许使用来自提供的 Executor 的线程。
  6. close() 方法应该向后台线程发出信号,并且不应关闭包装流,因为包装流应该由读取它的后台线程拥有。
  7. 做一些愚蠢的事情,比如在结束后阅读,应该适本地照顾。

package nz.co.datacute.io;

import java.io.IOException;
import java.io.InputStream;
import java.util.Arrays;
import java.util.concurrent.LinkedBlockingQueue;

public class BackgroundInputStream extends InputStream {
private static final int DEFAULT_QUEUE_SIZE = 1;
private static final int DEFAULT_BUFFER_SIZE = 64*1024;
private final int queueSize;
private final int bufferSize;
private volatile boolean eof = false;
private LinkedBlockingQueue<byte[]> bufferQueue;
private final InputStream wrappedInputStream;
private byte[] currentBuffer;
private volatile byte[] freeBuffer;
private int pos;

public BackgroundInputStream(InputStream wrappedInputStream) {
this(wrappedInputStream, DEFAULT_QUEUE_SIZE, DEFAULT_BUFFER_SIZE);
}

public BackgroundInputStream(InputStream wrappedInputStream,int queueSize,int bufferSize) {
this.wrappedInputStream = wrappedInputStream;
this.queueSize = queueSize;
this.bufferSize = bufferSize;
}

@Override
public int read() throws IOException {
if (bufferQueue == null) {
bufferQueue = new LinkedBlockingQueue<byte[]>(queueSize);
BackgroundReader backgroundReader = new BackgroundReader();
Thread backgroundReaderThread = new Thread(backgroundReader, "Background InputStream");
backgroundReaderThread.start();
}
if (currentBuffer == null) {
try {
if ((!eof) || (bufferQueue.size() > 0)) {
currentBuffer = bufferQueue.take();
pos = 0;
} else {
return -1;
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
int b = currentBuffer[pos++];
if (pos == currentBuffer.length) {
freeBuffer = currentBuffer;
currentBuffer = null;
}
return b;
}

@Override
public int available() throws IOException {
if (currentBuffer == null) return 0;
return currentBuffer.length;
}

@Override
public void close() throws IOException {
wrappedInputStream.close();
currentBuffer = null;
freeBuffer = null;
}

class BackgroundReader implements Runnable {

@Override
public void run() {
try {
while (!eof) {
byte[] newBuffer;
if (freeBuffer != null) {
newBuffer = freeBuffer;
freeBuffer = null;
} else {
newBuffer = new byte[bufferSize];
}
int bytesRead = 0;
int writtenToBuffer = 0;
while (((bytesRead = wrappedInputStream.read(newBuffer, writtenToBuffer, bufferSize - writtenToBuffer)) != -1) && (writtenToBuffer < bufferSize)) {
writtenToBuffer += bytesRead;
}
if (writtenToBuffer > 0) {
if (writtenToBuffer < bufferSize) {
newBuffer = Arrays.copyOf(newBuffer, writtenToBuffer);
}
bufferQueue.put(newBuffer);
}
if (bytesRead == -1) {
eof = true;
}
}
} catch (IOException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}

}
}

最佳答案

听起来很有趣。我从来没有遇到过任何开箱即用的东西,但如果可用的话,尝试使用空闲核心进行压缩是非常有意义的。

也许你可以利用 Commons I/O - 这是一个经过良好测试的库,可以帮助处理一些更无聊的东西,让您专注于扩展很酷的并行部分。也许您甚至可以将您的代码贡献给 Commons 项目 ;-)

关于java - 缓冲后台 InputStream 实现,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/2153911/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com