作者热门文章
- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
我正在尝试使用 Java8 中的流创建一个生产者多个消费者模型。我正在读取和处理来自数据库资源的数据,并且 我想以流方式处理它们(不能将整个资源读入内存)。
源的读取必须是单线程的(游标不是线程安全的)并且读取速度很快,而不是重操作的每个数据 block 的处理可以并行运行。
我还没有找到如何将非并行流与并行流处理连接(互连)的方法。有什么办法可以使用 Java8 流 API 来实现吗?
代码示例:
这个迭代器必须在单线程中运行,因为游标不是线程安全的。
class SimpleIterator<Data> implements Iterator<Data>{
private volatile Cursor cursor;
public SimpleIterator(Cursor cursor){
this.cursor = cursor;
}
@Override
public boolean hasNext() {
return cursor.hasNext();
}
@Override
public Data next() {
return cursor.next();
}
}
//创建非并行流
SimpleIterator<Data> iterator = new SimpleIterator<>(queryCursor);
Iterable<Data> iterable = () -> iterator;
Stream<Data> resultStream = StreamSupport.stream(iterable.spliterator(), false); // prallel set as false
//每个数据的处理数据应该并行运行
resultStream.parallel().forEach(data->processData(data));
public processData(Data data){
//heavy operation
}
但是,如果我在调用 forEach 之前将流设置为并行,那么整个流都是并行的,并且迭代器也在多个线程中调用。有什么方法可以在 Java8 中互连这两个流,或者我必须创建一些队列来提供从单线程生产者流到并行流的数据。
最佳答案
我正在解决一个问题,我需要对两个流进行完全外部连接。问题似乎很相似。我所做的是插入两个阻塞队列来缓冲我的输入。我认为您可以对一个阻塞队列执行类似的操作,将单个流拆分为多个流,而无需并行化源流。
我建议的解决方案可以在下面找到。我还没有测试我加入两个流的解决方案,所以我不确定这是否有效。 AbstractSpliterator 类有一个 trySplit 的实现; trySplit 上的评论提供了很多信息。该类的 final方法从拆分器实现构造一个可并行化的流。
import java.util.Spliterators;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.function.Consumer;
import java.util.stream.Stream;
public class StreamSplitter<T> extends Spliterators.AbstractSpliterator<T> {
final T EOS = null; // Just a stub -- can't put a null in BlockingQueue
private final BlockingQueue<T> queue;
private final Thread thread;
// An implementation of Runnable that fills a queue from a stream
private class Filler implements Runnable {
private final Stream<T> stream;
private final BlockingQueue<T> queue;
private Filler(Stream<T> stream, BlockingQueue<T> queue) {
this.stream = stream;
this.queue = queue;
}
@Override
public void run() {
stream.forEach(x -> {
try {
// Blocks if the queue is full
queue.put(x);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
// Stream is drained put end of stream marker.
try {
queue.put(EOS);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
private StreamSplitter(long estSize, int characteristics, Stream<T> srcStream) {
super(estSize, characteristics);
queue = new ArrayBlockingQueue<T>(1024);
// Fill the queue from a separate thread (may want to externalize this).
thread = new Thread(new Filler(srcStream, queue));
thread.start();
}
@Override
public boolean tryAdvance(Consumer<? super T> action) {
try {
T value = queue.take(); // waits (blocks) for entries in queue
// If end of stream marker is found, return false signifying
// that the stream is finished.
if (value == EOS) {
return false;
}
// Accept the next value.
action.accept(value);
} catch (InterruptedException e) {
return false;
}
return true;
}
public static <T> Stream<T> splitStream(long estSize, int characteristics, Stream<T> srcStream) {
Spliterator<T> spliterator = new StreamSplitter<T>(estSize, characteristics, srcStream);
return StreamSupport.stream(spliterator, true);
}
}
关于java - 如何将非并行流与并行流(一个生产者多个消费者)互连,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/36791761/
我是一名优秀的程序员,十分优秀!