gpt4 book ai didi

java - 如何使用队列提供无限流?

转载 作者:塔克拉玛干 更新时间:2023-11-02 20:00:12 28 4
gpt4 key购买 nike

我想要几个读取文件的线程。这些文件是 ZIP 文件,本身包含多个文本文件。因此必须逐行读取每个文件。

文件的任何内容都应该发送到某种队列。队列本身应该从工作线程无限处理。

如果可能的话,如何实现这样的场景?我想出了一些伪代码,但我真的不知道如何实现:

Queue<String> queue;

//multiple threads:
BufferedReader br;
queue.add(br.readLine());

//processing thread for the queue:
queue.stream().parallel().forEach(line -> convertAndWrite(line));

//worker function:
private void convertAndWrite(String line) {
//convert the line to an output format,
//and write each line eg to an output file or perist in DB, whatever
}

最佳答案

查看 How to interconect non-paralel stream with parallel stream(one producer multiple consumers) 的答案.对于这个问题,使用无法并行化的流来填充阻塞队列。实现了一个可并行化的拆分器来耗尽这个队列。如果您希望文件是连续的,那么您可能只有一个读取器来填充队列。

然后您使用 StreamSupport 从拆分器创建一个流。阻塞队列支持并发修改,因此拆分器实现可以并行化,因此您的流可以并行化。如果您的下游编写器是可并行化的,那么您的整个消费者端都可以并行化。

如果您的读者遇到异常,那么您将一个 End-of-Stream 标记(可能作为 final 子句的一部分)插入 BlockingQueue 并重新抛出。只有一个 tryAdvance 调用方(请参阅 AbstractSpliterator),因此单个流结束标记足以终止所有并行流。

关于java - 如何使用队列提供无限流?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/29809470/

28 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com