gpt4 book ai didi

java - 确保所有任务完成的同步对象

转载 作者:塔克拉玛干 更新时间:2023-11-03 03:01:39 25 4
gpt4 key购买 nike

我应该使用哪个 Java 同步对象来确保完成任意数量的任务?约束是:

  1. 每项任务都需要很长时间才能完成,适合并行执行任务。
  2. 有太多任务无法放入内存(即我无法将每个任务的 Future 放入 Collection 中,然后调用 get在所有 future 上)。
  3. 我不知道会有多少任务(即我不能使用 CountDownLatch)。
  4. ExecutorService 可能是共享的,所以我不能使用 awaitTermination( long, TimeUnit )

例如,对于 Grand Central Dispatch,我可能会这样做:

let workQueue = dispatch_get_global_queue( QOS_CLASS_BACKGROUND, 0 )
let latch = dispatch_group_create()
let startTime = NSDate()
var itemsProcessed = 0
let countUpdateQueue = dispatch_queue_create( "countUpdateQueue", DISPATCH_QUEUE_SERIAL )
for item in fetchItems() // generator returns too many items to store in memory
{
dispatch_group_enter( latch )
dispatch_async( workQueue )
{
self.processItem( item ) // method takes a non-trivial amount of time to run
dispatch_async( countUpdateQueue )
{
itemsProcessed++
}
dispatch_group_leave( latch )
}
}
dispatch_group_wait( latch, DISPATCH_TIME_FOREVER )
let endTime = NSDate()
let totalTime = endTime.timeIntervalSinceDate( startTime )
print( "Processed \(itemsProcessed) items in \(totalTime) seconds." )

它产生的输出看起来像这样(对于 128 个项目):Processed 128 items in 1.846794962883 seconds.

我用 Phaser 尝试了类似的东西:

final Executor executor = new ThreadPoolExecutor( 64, 64, 1l, MINUTES, new LinkedBlockingQueue<Runnable>( 8 ), new CallerRunsPolicy() );
final Phaser latch = new Phaser( 0 );
final long startTime = currentTimeMillis();
final AtomicInteger itemsProcessed = new AtomicInteger( 0 );
for( final String item : fetchItems() ) // iterator returns too many items to store in memory
{
latch.register();
final Runnable task = new Runnable() {
public void run() {
processItem( item ); // method takes a non-trivial amount of time to run
itemsProcessed.incrementAndGet();
latch.arrive();
}
};
executor.execute( task );
}
latch.awaitAdvance( 0 );
final long endTime = currentTimeMillis();
out.println( "Processed " + itemsProcessed.get() + " items in " + ( endTime - startTime ) / 1000.0 + " seconds." );

任务并不总是在最后一个 print 语句之前完成,我可能会得到如下所示的输出(对于 128 个项目):Processed 121 items in 5.296 seconds.Phaser 甚至正确的对象使用?文档表明它仅支持 65,535 个参与方,因此我需要对要处理的项目进行批处理或引入某种Phaser 分层。

最佳答案

此示例中 Phaser 用法的问题是 CallerRunsPolicy 允许任务在启动线程上执行。因此,当循环仍在进行时,到达方的数量可以等于注册方的数量,从而导致阶段增加。解决方案是用 1 方初始化 Phaser 然后,当循环完成时,到达并等待其他方到达。这确保在所有任务完成之前阶段不会递增到 1。

final Executor executor = new ThreadPoolExecutor( 64, 64, 1l, MINUTES, new LinkedBlockingQueue<Runnable>( 8 ), new CallerRunsPolicy() );
final Phaser latch = new Phaser( 1 );
final long startTime = currentTimeMillis();
final AtomicInteger itemsProcessed = new AtomicInteger( 0 );
for( final String item : fetchItems() ) // iterator returns too many items to store in memory
{
latch.register();
final Runnable task = new Runnable() {
public void run() {
processItem( item ); // method takes a non-trivial amount of time to run
itemsProcessed.incrementAndGet();
final int arrivalPhase = latch.arrive();
}
};
executor.execute( task );
}
latch.arriveAndAwaitAdvance();
final long endTime = currentTimeMillis();
out.println( "Processed " + itemsProcessed.get() + " items in " + ( endTime - startTime ) / 1000.0 + " seconds." );

关于java - 确保所有任务完成的同步对象,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/34262405/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com