gpt4 book ai didi

java - PipedInputStream 和 PipedOutputStream 中的异常传播

转载 作者:搜寻专家 更新时间:2023-10-31 20:10:07 33 4
gpt4 key购买 nike

我有一个数据生成器,它在单独的线程中运行并将生成的数据推送到连接到 PipedInputStreamPipedOutputStream 中。此输入流的引用通过公共(public) API 公开,以便任何客户端都可以使用它。 PipedInputStream 包含一个有限的缓冲区,如果缓冲区已满,将阻止数据生产者。基本上,当客户端从输入流中读取数据时,数据生产者 会生成新数据。

问题是数据生产者可能会失败并抛出异常。但是由于消费者在单独的线程中运行,因此没有很好的方法将异常传递给客户端。

我所做的是捕获该异常并关闭输入流。这将导致 IOException 在客户端出现消息“Pipe closed”,但我真的很想向客户说明背后的真正原因。

这是我的 API 的粗略代码:

public InputStream getData() {
final PipedInputStream inputStream = new PipedInputStream(config.getPipeBufferSize());
final PipedOutputStream outputStream = new PipedOutputStream(inputStream);

Thread thread = new Thread(() -> {
try {
// Start producing the data and push it into output stream.
// The production my fail and throw an Exception with the reason
} catch (Exception e) {
try {
// What to do here?
outputStream.close();
inputStream.close();
} catch (IOException e1) {
}
}
});
thread.start();

return inputStream;
}

我有两个解决方法:

  1. 将异常存储在父对象中,并通过 API 将其公开给客户端。 IE。如果读取失败并出现 IOException,客户端可以向 API 询问原因。
  2. 扩展/重新实现管道流,以便我可以将原因传递给 close() 方法。然后流抛出的 IOException 可以包含该原因作为消息。

有什么更好的主意吗?

最佳答案

巧合的是,我刚刚编写了类似的代码来允许对流进行 GZip 压缩。您不需要扩展 PipedInputStream,只需 FilterInputStream将执行并返回一个包装版本,例如

final PipedInputStream in = new PipedInputStream();
final InputStreamWithFinalExceptionCheck inWithException = new InputStreamWithFinalExceptionCheck(in);
final PipedOutputStream out = new PipedOutputStream(in);
Thread thread = new Thread(() -> {
try {
// Start producing the data and push it into output stream.
// The production my fail and throw an Exception with the reason
} catch (final IOException e) {
inWithException.fail(e);
} finally {
inWithException.countDown();
}
});
thread.start();
return inWithException;

然后 InputStreamWithFinalExceptionCheck 就是

private static final class InputStreamWithFinalExceptionCheck extends FilterInputStream {
private final AtomicReference<IOException> exception = new AtomicReference<>(null);
private final CountDownLatch complete = new CountDownLatch(1);

public InputStreamWithFinalExceptionCheck(final InputStream stream) {
super(stream);
}

@Override
public void close() throws IOException {
try {
complete.await();
final IOException e = exception.get();
if (e != null) {
throw e;
}
} catch (final InterruptedException e) {
throw new IOException("Interrupted while waiting for synchronised closure");
} finally {
stream.close();
}
}

public void fail(final IOException e) {
exception.set(Preconditions.checkNotNull(e));
}

public void countDown() {complete.countDown();}
}

关于java - PipedInputStream 和 PipedOutputStream 中的异常传播,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/33686430/

33 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com