- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
我正在尝试使用 Java8 中的流创建一个生产者多个消费者模型。我正在读取和处理来自数据库资源的数据,并且 我想以流方式处理它们(不能将整个资源读入内存)。
源的读取必须是单线程的(游标不是线程安全的)并且读取速度很快,而不是重操作的每个数据 block 的处理可以并行运行。
我还没有找到如何将非并行流与并行流处理连接(互连)的方法。有什么办法可以使用 Java8 流 API 来实现吗?
代码示例:
这个迭代器必须在单线程中运行,因为游标不是线程安全的。
class SimpleIterator<Data> implements Iterator<Data>{
private volatile Cursor cursor;
public SimpleIterator(Cursor cursor){
this.cursor = cursor;
}
@Override
public boolean hasNext() {
return cursor.hasNext();
}
@Override
public Data next() {
return cursor.next();
}
}
//创建非并行流
SimpleIterator<Data> iterator = new SimpleIterator<>(queryCursor);
Iterable<Data> iterable = () -> iterator;
Stream<Data> resultStream = StreamSupport.stream(iterable.spliterator(), false); // prallel set as false
//每个数据的处理数据应该并行运行
resultStream.parallel().forEach(data->processData(data));
public processData(Data data){
//heavy operation
}
但是,如果我在调用 forEach 之前将流设置为并行,那么整个流都是并行的,并且迭代器也在多个线程中调用。有什么方法可以在 Java8 中互连这两个流,或者我必须创建一些队列来提供从单线程生产者流到并行流的数据。
最佳答案
我正在解决一个问题,我需要对两个流进行完全外部连接。问题似乎很相似。我所做的是插入两个阻塞队列来缓冲我的输入。我认为您可以对一个阻塞队列执行类似的操作,将单个流拆分为多个流,而无需并行化源流。
我建议的解决方案可以在下面找到。我还没有测试我加入两个流的解决方案,所以我不确定这是否有效。 AbstractSpliterator 类有一个 trySplit 的实现; trySplit 上的评论提供了很多信息。该类的 final方法从拆分器实现构造一个可并行化的流。
import java.util.Spliterators;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.function.Consumer;
import java.util.stream.Stream;
public class StreamSplitter<T> extends Spliterators.AbstractSpliterator<T> {
final T EOS = null; // Just a stub -- can't put a null in BlockingQueue
private final BlockingQueue<T> queue;
private final Thread thread;
// An implementation of Runnable that fills a queue from a stream
private class Filler implements Runnable {
private final Stream<T> stream;
private final BlockingQueue<T> queue;
private Filler(Stream<T> stream, BlockingQueue<T> queue) {
this.stream = stream;
this.queue = queue;
}
@Override
public void run() {
stream.forEach(x -> {
try {
// Blocks if the queue is full
queue.put(x);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
// Stream is drained put end of stream marker.
try {
queue.put(EOS);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
private StreamSplitter(long estSize, int characteristics, Stream<T> srcStream) {
super(estSize, characteristics);
queue = new ArrayBlockingQueue<T>(1024);
// Fill the queue from a separate thread (may want to externalize this).
thread = new Thread(new Filler(srcStream, queue));
thread.start();
}
@Override
public boolean tryAdvance(Consumer<? super T> action) {
try {
T value = queue.take(); // waits (blocks) for entries in queue
// If end of stream marker is found, return false signifying
// that the stream is finished.
if (value == EOS) {
return false;
}
// Accept the next value.
action.accept(value);
} catch (InterruptedException e) {
return false;
}
return true;
}
public static <T> Stream<T> splitStream(long estSize, int characteristics, Stream<T> srcStream) {
Spliterator<T> spliterator = new StreamSplitter<T>(estSize, characteristics, srcStream);
return StreamSupport.stream(spliterator, true);
}
}
关于java - 如何将非并行流与并行流(一个生产者多个消费者)互连,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/36791761/
我正在尝试使用 Java8 中的流创建一个生产者多个消费者模型。我正在读取和处理来自数据库资源的数据,并且 我想以流方式处理它们(不能将整个资源读入内存)。 源的读取必须是单线程的(游标不是线程安全的
我需要通过 RESTful 接口(interface)连接到另一个系统的 API,该接口(interface)将通过 websocket 连接提供事件。 我正在使用一个 javascript 库来处理
我有以下要求。请提前为我提供一些帮助。 软件:-1.Unix2. Apache 2.03.tomcat 6.0.35 当前情况:-1.MOD_JK连接Apache和tomcat2.Tomcat有多个w
我尝试设置3个jquery ui slider 互连,3个 slider 的总和必须始终保持在100,因此当我更改一个 slider 的值时,其他 slider 必须更新。 例如,如果一个 slide
我正在尝试在 eth0(USB 调制解调器)和 wlan0(Wifi 热点)之间建立一座桥,或者将所有流量从 wlan0 路由到 eth0。 程序: 如果接口(interface)已经有IP,桥接将不
我已经用 Python 完成了计算理论概念的数学模拟编码,例如语法检查和其他内容。我的问题是我必须为它构建一个看起来像样的 GUI。 我看过 PyQt4,缺乏文档确实是一个很大的阻碍。我查看了 Pyt
考虑 PL (FPGA) 端的 AXI4 互连。 当我双击查看可用选项时,从属接口(interface)中有一个选项卡。包含以下选项。 启用寄存器片的目的是什么? outer指的是二级缓存吗? Aut
我在我的程序中与 JGraphT 的 JGraphXAdapter 可视化了关系. 不幸的是,我只需要允许用户对图形进行视觉修改,即移动/调整节点大小。但他仍然可以编辑一些东西,尽管我禁用了我发现的所
我正在尝试改进我的 UI 以将两个组合框相互链接。 我的 xml 文件包含: 我正在从 xml 文件中读取数据,并将所有 org_Id (4, 5) 放在一个 QStringList
我是一名优秀的程序员,十分优秀!