gpt4 book ai didi

java - Java中元素迭代器的并行计算

转载 作者:塔克拉玛干 更新时间:2023-11-02 07:58:54 25 4
gpt4 key购买 nike

我现在有过几次相同的需求,并且想就构建解决方案的正确方法获得其他想法。需要在多个线程上对多个元素执行一些操作,而不需要一次将所有元素都放在内存中,只需要计算中的元素。如,Iterables.partition是不够的,因为它预先将所有元素带入内存。

用代码表示,我想编写一个 BulkCalc2,它做与 BulkCalc1 相同的事情,只是并行进行。下面是说明我的最佳尝试的示例代码。我不满意,因为它又大又丑,但它似乎确实实现了我的目标,即在工作完成之前保持线程的高利用率,propagating计算期间的任何异常,并且内存中的 BigThing 实例不会超过 numThreads 个。

我会接受以最简洁的方式满足既定目标的答案,无论它是改进我的 BulkCalc2 的方法还是完全不同的解决方案。

interface BigThing {

int getId();

String getString();
}

class Calc {

// somewhat expensive computation
double calc(BigThing bigThing) {
Random r = new Random(bigThing.getString().hashCode());
double d = 0;
for (int i = 0; i < 100000; i++) {
d += r.nextDouble();
}
return d;
}
}

class BulkCalc1 {

final Calc calc;

public BulkCalc1(Calc calc) {
this.calc = calc;
}

public TreeMap<Integer, Double> calc(Iterator<BigThing> in) {
TreeMap<Integer, Double> results = Maps.newTreeMap();
while (in.hasNext()) {
BigThing o = in.next();
results.put(o.getId(), calc.calc(o));
}
return results;
}
}

class SafeIterator<T> {

final Iterator<T> in;

SafeIterator(Iterator<T> in) {
this.in = in;
}

synchronized T nextOrNull() {
if (in.hasNext()) {
return in.next();
}
return null;
}
}

class BulkCalc2 {

final Calc calc;
final int numThreads;

public BulkCalc2(Calc calc, int numThreads) {
this.calc = calc;
this.numThreads = numThreads;
}

public TreeMap<Integer, Double> calc(Iterator<BigThing> in) {
ExecutorService e = Executors.newFixedThreadPool(numThreads);
List<Future<?>> futures = Lists.newLinkedList();

final Map<Integer, Double> results = new MapMaker().concurrencyLevel(numThreads).makeMap();
final SafeIterator<BigThing> it = new SafeIterator<BigThing>(in);
for (int i = 0; i < numThreads; i++) {
futures.add(e.submit(new Runnable() {

@Override
public void run() {
while (true) {
BigThing o = it.nextOrNull();
if (o == null) {
return;
}
results.put(o.getId(), calc.calc(o));
}
}
}));
}

e.shutdown();

for (Future<?> future : futures) {
try {
future.get();
} catch (InterruptedException ex) {
// swallowing is OK
} catch (ExecutionException ex) {
throw Throwables.propagate(ex.getCause());
}
}

return new TreeMap<Integer, Double>(results);
}
}

最佳答案

并且,简洁的方式:(较慢,不那么健壮或干净,但仍然相当不错)

33 行计算,全部在一个方法中。由于不需要同步,效率不高,并且(与上述不同)它在处理每个异常时都会丢失一个线程(并且必须创建一个新线程)。我发布的上一篇文章只是将所有异常收集到一个整齐的包中,供以后处理。如果有时会发生异常,这会提高性能,因为创建线程的成本适中。

/** More succinct */
public static Map<Integer, Double> bulkCalcSuccincter(final Iterator<BigThing> it, final Calc calc, final int numThreads) {
final ConcurrentHashMap<Integer, Double> results = new ConcurrentHashMap<Integer, Double>();
final java.util.List<Future> futures = new ArrayList<Future>();
final ExecutorService e = Executors.newFixedThreadPool(numThreads);

for (int i = 0; i < numThreads; i++) {
futures.add(e.submit(new Runnable() {
public void run() {
while (true) {
BigThing thing = null;
synchronized (it) {
thing = (it.hasNext()) ? it.next() : null;
}
if (thing == null) {
break;
}
results.put(thing.getId(), calc.calc(thing));
}
}
}));
}
e.shutdown();

for (Future f : futures) {
try {
f.get();
} catch (InterruptedException ex) {
// swallowing is better than spitting it out
} catch (ExecutionException ex) {
throw Throwables.propagate(ex.getCause());
}
}
return results;
}

关于java - Java中元素迭代器的并行计算,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/1932201/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com