gpt4 book ai didi

java - PipedInputStream.java 的设计中是否存在错误或者我误解了它的设计?

转载 作者:行者123 更新时间:2023-12-02 05:00:03 26 4
gpt4 key购买 nike

我有一个与 PipedInputStream 相关的问题和 PipedOutputStream ,也不知道是我对这些类的设计有误解,还是PipedInputStream.java中的java代码有bug

据我了解PipedInputStreamPipedOutputStream实现一种可用于在两个不同线程之间创建流的机制。生产者线程在 PipedOutputStream 中写入了一些内容。消费者线程在连接的 PipedInputStream 中读取它.有一个内部缓冲区以允许缓冲通信。默认情况下,此缓冲区的大小为 1024 字节。

如果消费者线程读取 PipedInputStream并且缓冲区为空,然后线程等待。如果生产者线程写入 PipedOutputStream并且缓冲区已满,然后线程也等待。
PipedInputStream维护内部缓冲区,PipedOutputStream只使用 PipedInputStream 中声明的函数.
PipedInputStream 中与内部(循环)缓冲区相关的所有字段( byte [] bufferint inint out -正如您在 PipedInputStream.java 中看到的那样)都声明为 protected . PipedInputStream使用 2 个不同的 PipedInputStream.receive 注入(inject)数据职能。

所有 InputStreams 都有两个读取版本:read()read(byte [], int, int) .所有的 OutputStreams 都有两个写入版本 write(byte b)write(byte [], int, int) .都有一个单字节版本和一个多字节版本。 PipedInputStreamPipedOutputStream有这些功能。
PipedOutputStream.write(byte b)使用 PipedInputStream.receive(int b)在连接的PipedInputStream中注入(inject)字节的函数.此接收函数声明为 protected所以你可以重载这个函数并拦截来自PipedOutputStream的任何字节注入(inject)连接到PipedInputStream .
PipedOutputStream.write(byte b[], int offset, int len)使用 PipedInputStream.receive(byte [] b, int offset, int len)在连接的 PipedInputStream 中注入(inject)一个字节数组.

我的问题来了:PipedInputStream.receive(byte [], int, int) , receive(int) 的多字节对应物, 未被声明为 receive(int)也就是说,它具有默认的可见性(包可见性)。所以你不能重载这个函数并拦截来自 PipedOutputStream 的多字节注入(inject)。连接到PipedInputStream .
PipedInputStream.write(byte b[], int offset, int len)不调用 PipedInputStream.write(int b) .所以重载receive(int)使用 receive(byte [],int, int) 时无效.

据我了解,PipedInputStream.receive(byte[], int, int)应该是 protectedPipedInputStream.receive(int)是。它的声明:
synchronized void receive(byte [] b, int off, int len) throws IOException {
应该:
protected synchronized void receive(byte [] b, int off, int len) throws IOException {PipeReaderPipeWriter (PipedInputStreamPipedOutputStream 的字符版本)声明缓冲区字段和具有包可见性的接收方法(不 protected !)。 Java 中的 Reader/Writer(JDK1.1 起)比 InputStream/OutputStream(JDK1.0 起)更新。
PipedInputStream 的设计中是否存在真正的错误? ?, 是 protected PipedInputStream 中的可见度从早期 Java 版本继承的设计事故?还是我完全迷失了?

提前致谢。

PD:这是出现此问题的示例。该程序无法编译(由提到的接收可见性问题)。在这个例子中,我尝试创建一个 PipedInputStream允许在需要时自动扩展缓冲区的子类。因此,如果缓冲区为空并且有人尝试读取线程等待。但是,如果缓冲区已满并且有人尝试写入(使用连接的 PipedOutputStream ),则线程不会等待,而是扩展缓冲区以存储更多字节。消费者等待,但生产者不等待。

我有自己的这个例子的功能实现,但我想知道它是否不能实现为 PipedInputStream子类。

import java.io.IOException;
import java.io.PipedInputStream;
import java.io.PipedOutputStream;

public class ExtensiblePipedInputStream extends PipedInputStream {

/**
* Default extensions' size
*/
private static final int DEFAULT_EXTENSION = 1024;

/**
* The current extensions' size
*/
protected int extension = DEFAULT_EXTENSION;


// the same constructors than the super class (PipedInputStream)...

public ExtensiblePipedInputStream() {
super();
}

public ExtensiblePipedInputStream(PipedOutputStream src) throws IOException {
super(src);
}

public ExtensiblePipedInputStream(int pipeSize) {
super(pipeSize);
}

public ExtensiblePipedInputStream(PipedOutputStream src, int pipeSize) throws IOException {
super(src, pipeSize);
}

/**
* This function ensures the specified capacity in the internal buffer. If
* the specified capacity is less or equals than the current internal buffer
* capacity it does nothing. If the specified capacity is greater than the
* current one, then the buffer is extended to: at least allocate the new
* capacity. This function extends the buffer using multiple factors of
* extension size.
*
* @param capacity The capacity
* @throws IOException if an IO error occurs
* @throws IllegalArgumentException if capacity is negative
*/
public synchronized void ensureCapacity(int capacity) throws IOException, IllegalArgumentException {
if (capacity < 0) {
throw new IllegalArgumentException("capacity < 0");
}

if (capacity > buffer.length) {
int additionalSpace = capacity - buffer.length;
final int modExtension = additionalSpace % extension;
additionalSpace += (modExtension == 0) ? 0 : extension - modExtension;

setCapacity(buffer.length + additionalSpace);
}
}

/**
* Returns capacity of the internal buffer (the buffer's size).
*
* @return The capacity or the internal buffer
*/
public synchronized int getCapacity() {
return buffer.length;
}

/**
* Returns the size of the next buffer's extensions.
*
* @return The size of the next buffer's extensions.
*/
public synchronized int getExtension() {
return extension;
}

/**
* This function extends and invokes PipedInputStream.receive. It only avoid
* writers block by extending the internal buffer when needed.
*
* @param b The byte to be received
* @throws IOException if an IO error occurs
*/
@Override
protected synchronized void receive(int b) throws IOException {
ensureCapacity(available() + 1);
super.receive(b);
}

/**
* MY PROBLEM!!!!
*
* this function is not posible!
*
* PipedInputStream.receive(byte[], int, int)
* has not protected visibility, it has package visibility!!!!!
*
* Why?
*
* @param b The array of bytes to be received
* @param off The offset in the array of bytes.
* @param len The number of bytes to be received.
* @throws IOException If an IO error occurs
*/
@Override
protected synchronized void receive(byte b[], int off, int len) throws IOException {
ensureCapacity(available() + len);
super.receive(b, off, len);
}

/**
* Changes the size of the internal buffer. The new size must be greater or
* equals than the number of bytes stored in the internal buffer
* (available())
*
* @param capacity The new size of the internal buffer.
* @throws IOException If an IO error occurs.
* @throws IllegalArgumentException If capacity < available()
*/
public synchronized void setCapacity(int capacity) throws IOException, IllegalArgumentException {
final int available = available();

if (capacity < available) {
throw new IllegalArgumentException("capacity < available");
}

final byte[] nbuf = new byte[capacity];
if (available > 0) {
final int firstTransferAmount = Math.min(available, buffer.length - out);
System.arraycopy(buffer, out, nbuf, 0, firstTransferAmount);
if (in > 0) {
System.arraycopy(buffer, 0, nbuf, firstTransferAmount, in);
}
out = 0;
in = (available == capacity) ? 0 : available;
}

buffer = nbuf;
}

/**
* Set the size of future extensions. It must be a value greater than 0.
*
* @param extension The size of future extensions.
* @throws IllegalArgumentException If extension <= 0
*/
public synchronized void setExtension(int extension) throws IllegalArgumentException {
if (extension <= 0) {
throw new IllegalArgumentException("extension <= 0");
}

this.extension = extension;
}

}

最佳答案

我认为您误解了管道的用途。管道用于 阻塞 两个不同线程之间的通信。写入器的速度预计将受限于读取器的速度,这意味着管道在内存使用方面是有效的,但会将处理限制为最慢组件的速度。

如果您想要异步写入,您应该考虑使用 Queue - java.util.concurrent 包中的版本之一应该适合。

关于java - PipedInputStream.java 的设计中是否存在错误或者我误解了它的设计?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/28364621/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com