- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
我是ETL流程,正在从Spring Data Repository中检索很多实体。然后,我使用并行流将实体映射到不同的实体。
我既可以使用使用者将这些新实体一个接一个地存储在另一个存储库中,也可以将它们收集到一个列表中并以单个批量操作的形式存储。
前者成本很高,而后者可能超出可用内存。
有没有一种很好的方法来收集流中的一定数量的元素(如limit一样),消耗该块并继续并行进行直到处理完所有元素?
最佳答案
我使用分块进行批量操作的方法是使用分区分割器包装器,另一个包装器将默认的分割策略(批量大小的算术级数以1024为增量)覆盖为简单的固定批量分割。像这样使用它:
Stream<OriginalType> existingStream = ...;
Stream<List<OriginalType>> partitioned = partition(existingStream, 100, 1);
partitioned.forEach(chunk -> ... process the chunk ...);
import java.util.ArrayList;
import java.util.List;
import java.util.Spliterator;
import java.util.Spliterators.AbstractSpliterator;
import java.util.function.Consumer;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
public class PartitioningSpliterator<E> extends AbstractSpliterator<List<E>>
{
private final Spliterator<E> spliterator;
private final int partitionSize;
public PartitioningSpliterator(Spliterator<E> toWrap, int partitionSize) {
super(toWrap.estimateSize(), toWrap.characteristics() | Spliterator.NONNULL);
if (partitionSize <= 0) throw new IllegalArgumentException(
"Partition size must be positive, but was " + partitionSize);
this.spliterator = toWrap;
this.partitionSize = partitionSize;
}
public static <E> Stream<List<E>> partition(Stream<E> in, int size) {
return StreamSupport.stream(new PartitioningSpliterator(in.spliterator(), size), false);
}
public static <E> Stream<List<E>> partition(Stream<E> in, int size, int batchSize) {
return StreamSupport.stream(
new FixedBatchSpliterator<>(new PartitioningSpliterator<>(in.spliterator(), size), batchSize), false);
}
@Override public boolean tryAdvance(Consumer<? super List<E>> action) {
final ArrayList<E> partition = new ArrayList<>(partitionSize);
while (spliterator.tryAdvance(partition::add)
&& partition.size() < partitionSize);
if (partition.isEmpty()) return false;
action.accept(partition);
return true;
}
@Override public long estimateSize() {
final long est = spliterator.estimateSize();
return est == Long.MAX_VALUE? est
: est / partitionSize + (est % partitionSize > 0? 1 : 0);
}
}
import static java.util.Spliterators.spliterator;
import java.util.Comparator;
import java.util.Spliterator;
import java.util.function.Consumer;
public abstract class FixedBatchSpliteratorBase<T> implements Spliterator<T> {
private final int batchSize;
private final int characteristics;
private long est;
public FixedBatchSpliteratorBase(int characteristics, int batchSize, long est) {
characteristics |= ORDERED;
if ((characteristics & SIZED) != 0) characteristics |= SUBSIZED;
this.characteristics = characteristics;
this.batchSize = batchSize;
this.est = est;
}
public FixedBatchSpliteratorBase(int characteristics, int batchSize) {
this(characteristics, batchSize, Long.MAX_VALUE);
}
public FixedBatchSpliteratorBase(int characteristics) {
this(characteristics, 64, Long.MAX_VALUE);
}
@Override public Spliterator<T> trySplit() {
final HoldingConsumer<T> holder = new HoldingConsumer<>();
if (!tryAdvance(holder)) return null;
final Object[] a = new Object[batchSize];
int j = 0;
do a[j] = holder.value; while (++j < batchSize && tryAdvance(holder));
if (est != Long.MAX_VALUE) est -= j;
return spliterator(a, 0, j, characteristics());
}
@Override public Comparator<? super T> getComparator() {
if (hasCharacteristics(SORTED)) return null;
throw new IllegalStateException();
}
@Override public long estimateSize() { return est; }
@Override public int characteristics() { return characteristics; }
static final class HoldingConsumer<T> implements Consumer<T> {
Object value;
@Override public void accept(T value) { this.value = value; }
}
}
import static java.util.stream.StreamSupport.stream;
import java.util.Spliterator;
import java.util.function.Consumer;
import java.util.stream.Stream;
public class FixedBatchSpliterator<T> extends FixedBatchSpliteratorBase<T> {
private final Spliterator<T> spliterator;
public FixedBatchSpliterator(Spliterator<T> toWrap, int batchSize, long est) {
super(toWrap.characteristics(), batchSize, est);
this.spliterator = toWrap;
}
public FixedBatchSpliterator(Spliterator<T> toWrap, int batchSize) {
this(toWrap, batchSize, toWrap.estimateSize());
}
public FixedBatchSpliterator(Spliterator<T> toWrap) {
this(toWrap, 64, toWrap.estimateSize());
}
public static <T> Stream<T> withBatchSize(Stream<T> in, int batchSize) {
return stream(new FixedBatchSpliterator<>(in.spliterator(), batchSize), true);
}
public static <T> FixedBatchSpliterator<T> batchedSpliterator(Spliterator<T> toWrap, int batchSize) {
return new FixedBatchSpliterator<>(toWrap, batchSize);
}
@Override public boolean tryAdvance(Consumer<? super T> action) {
return spliterator.tryAdvance(action);
}
@Override public void forEachRemaining(Consumer<? super T> action) {
spliterator.forEachRemaining(action);
}
}
关于parallel-processing - 是否有一种从Java 8流中提取数据 block 的好方法?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/25408350/
在 Oracle 中,PARALLEL 被广泛使用。提示 PARALLEL、PARALLEL(8) 和 PARALLEL(a,8) 有什么区别。如何选择最佳的查询提示? SELECT /*+ PARA
好的,我希望以前没有问过这个问题,因为在搜索中很难找到。 我查看了 F95 手册,但仍然觉得这很模糊: For the simple case of: DO i=0,99 END DO 我正
我有一个 C-shell 脚本,其中有一个名为 $hosts_string 的变量,格式为: host1,host2,...,hostN 我还有一个名为 $chrs_string 的变量,其形式为:
是否可以从由gnu parallel产生的脚本的多次运行中调用gnu parallel? 我有一个python脚本,可以运行100个顺序顺序迭代,并且在每次迭代中的某处,并行计算4个值(使用gnu p
我想在几个输入上运行几个长时间运行的进程。例如。: solver_a problem_1 solver_b problem_1 ... solver_b problem_18 solver_c pro
TParallel.&For 和 TParallel.For 之间有区别吗? 两者都可以在 Delphi 10 Seattle 中编译。那么我应该坚持哪一个呢? 最佳答案 TParallel.&For
我第一次使用 julia 进行并行计算.我有点头疼。所以假设我开始 julia如下:julia -p 4 .然后我为所有处理器声明 a 函数,然后将它与 pmap 一起使用还有@parallel fo
关闭。这个问题是off-topic .它目前不接受答案。 想改善这个问题吗? Update the question所以它是 on-topic对于堆栈溢出。 10年前关闭。 Improve this
我有一堆相互排斥的方法,因此可以并行运行。有这样做的好方法吗?到目前为止,我有以下两种实现方式,但我不确定是否应该选择其中一种。 使用 Parallel.For : Parallel.For(0, 2
我对并行运行脚本很感兴趣,并且我已经开始查看 GNU 并行工具,但是我遇到了一些麻烦。我的脚本 doSomething 有 3 个参数,我想在参数的不同值上并行运行脚本。我该怎么做? 我试过:para
我需要在多核(和多线程)机器上运行多个作业。我正在使用 GNU Parallel utility跨核心分配作业以加速任务。要执行的命令在名为“命令”的文件中可用。我使用以下命令运行 GNU Paral
我正在尝试使用如下两个输入运行 Python 脚本。我得到了大约 300 个这两个输入,所以我想知道是否有人可以建议如何并行运行它们。 单次运行看起来像: python stable.py KOG_1
每天我都必须更新一堆存储库,并在其中一些中执行另一个命令(来自 CARTON,Perl 模块依赖管理器)。我总是使用循环来执行此操作,但我想与 并行执行GNU 并行 如果可能,但我不太了解它的tuto
正如标题所说:@parallel 之间究竟有什么区别?和 pmap ?我的意思不是明显的一个是循环的宏,另一个适用于函数,我的意思是它们的实现究竟有什么不同,我应该如何使用这些知识在它们之间进行选择?
我有一些矩阵乘法运算。我想通过多个处理器并行执行这些操作。这可以使用 MPI(消息传递接口(interface))在高性能计算集群上完成。 同样,我可以使用多个辅助角色在云中进行一些并行化吗?有什么办
joblib模块提供了一个简单的帮助程序类,以使用多处理并行编写循环的循环。 这段代码使用列表推导来完成这项工作: import time from math import sqrt from job
我的问题是这样的one .但我想做一些不同的事情... 例如,在我的并行区域内,我想在 4 个线程上运行我的代码。当每个线程进入 for 循环时,我想在 8 个线程上运行我的代码。像 #pramga
我正在尝试使用 ipython 并行库中的并行计算。但是我对此知之甚少,而且我发现很难从对并行计算一无所知的人那里阅读该文档。 有趣的是,我发现的所有教程都只是重复使用文档中的示例,并使用相同的解释,
我的项目结构看起来像 Root + subproj1 + subproj2 在每个子项目中定义了自己的任务 run(){}。 我想要做的是从 Root 项目的运行任务并行运行 :subpro
我有一个 Foo ID 的列表。我需要为每个 ID 调用一个存储过程。 例如 Guid[] siteIds = ...; // typically contains 100 to 300 elemen
我是一名优秀的程序员,十分优秀!