gpt4 book ai didi

java - 如何使对数组的写入对其他线程可见

转载 作者:行者123 更新时间:2023-11-30 06:52:17 25 4
gpt4 key购买 nike

我有一个基本类型 int 的输入数组,我想使用多个线程处理这个数组并将结果存储在一个相同类型和大小的输出数组中。以下代码在内存可见性方面是否正确?

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

public class ArraySynchronization2
{
final int width = 100;
final int height = 100;

final int[][] img = new int[width][height];
volatile int[][] avg = new int[width][height];

public static void main(String[] args) throws InterruptedException, ExecutionException
{
new ArraySynchronization2().doJob();;
}

private void doJob() throws InterruptedException, ExecutionException
{
final int threadNo = 8;
ExecutorService pool = Executors.newFixedThreadPool(threadNo);

final CountDownLatch countDownLatch = new CountDownLatch(width - 2);

for (int x = 1; x < width - 1; x++)
{
final int col = x;
pool.execute(new Runnable()
{
public void run()
{
for (int y = 0; y < height; y++)
{
avg[col][y] = (img[col - 1][y] + img[col][y] + img[col + 1][y]) / 3;
}
// how can I make the writes to the data in avg[][] visible to other threads? is this ok?
avg = avg;
countDownLatch.countDown();
};
});
}

try
{
// Does this make any memory visibility guarantees?
countDownLatch.await();
}
catch (InterruptedException e)
{
e.printStackTrace();
}

// can I read avg here, will the results be correct?
for (int x = 0; x < width; x++)
{
for (int y = 0; y < height; y++)
{
System.out.println(avg[x][y]);
}
}

pool.shutdown();
pool.awaitTermination(Long.MAX_VALUE, TimeUnit.DAYS);

// now I know tasks are completed and results synchronized (after thread death), but what if I plan to reuse the pool?
}
}

我不想在 CountDownLatch 上同步。我想知道如何使对输出数组的写入对其他线程可见。假设我有一个我想处理的数组(例如图像),我可以在多个单独的任务中执行此操作,这些任务将输入数组的 block 处理成输出数组,写入之间没有相互依赖关系输出。所有计算完成后,我希望输出数组中的所有结果都准备好读取。我怎样才能实现这种行为?我知道可以通过使用 submit 和 Future.get() 而不是 execute 来实现,我想知道如何正确实现这种低级机制?另请引用代码附近评论中提出的问题。

最佳答案

嗯,只是想知道您是否真的需要闩锁。数组本身是内存中的一个保留 block ,每个单元格都是一个专用的内存地址。 (顺便说一句。将其标记为 volatile 只会将数组的 reference 标记为 volatile,而不是数组的单元格,请参阅 here )。因此,仅当多个线程写入访问同一单元格时,您才需要协调对单元格的访问。

问题是,你真的在​​这样做吗?或者目标应该是:尽可能避免协调访问,因为这是有代价的。

在您的算法中,您对行进行操作,那么为什么不对行进行并行化,以便每个线程只读取和计算整个数组的行段的值而忽略其他行?

  • thread-0 -> 第 0、8、15 行,...
  • thread-1 -> 第 1、9、16 行,...
  • ...

基本上是这样的(还没有测试过):

for (int n = 0; n < threadNo; n++)  { //each n relates to a thread
pool.execute(new Runnable() {
public void run() {
for (int row = n; row < height; row += threadNo) { //proceed to the next row for the thread
for (int col = 1; col < width-1; col++) {
avg[col][row] = (img[col - 1][row] + img[col][row] + img[col + 1][row]) / 3;
}
}
};
});
}

这样他们就可以对整个数组进行操作,而根本不需要同步。通过在关闭池后放置循环打印出结果将确保所有计算线程都已完成,唯一需要等待的线程是主线程。

此方法的替代方法是为每个线程创建一个大小为 100/ThreadNo 的平均数组,以便每个线程对其进行写操作在数组上,然后您将数组与 合并>System.arraycopy() 到一个数组中。

如果您打算重用池,您应该使用提交 而不是执行并在Futures 上调用get()你从提交中得到。

Set<Future> futures = new HashSet<>();
for(int n = 0; ...) {
futures.add(pool.submit(new Runnable() {...}));
}

for(Future f : futures) {
f.get(); //blocks until the task is completed
}

如果你想读取数组的中间状态,你可以直接读取它,如果单个单元格上的不一致数据是可以接受的,或者使用 AtomicIntegerArray ,正如 Nicolas Filotto 所建议的那样。

-- 编辑--

在使用锁存器的宽度而不是原始线程数的编辑和所有讨论之后,我想补充几句话。

正如@jameslarge 指出的那样,它是关于如何建立“先于发生”关系,或者如何保证操作 A(即写入)发生在操作 B(即读取)之前。因此需要协调两个线程之间的访问。有几种选择

  • volatile 关键字 - 不适用于数组,因为它只标记引用,而不标记值是易变的
  • 同步——悲观锁(synchronized修饰符或语句)
  • CAS - 乐观锁定,被很多并发实现使用

然而,每个同步点(悲观或乐观)都会建立一个发生在之前的关系。你选择哪一个,取决于你的要求。

你想要实现的是主线程的读操作和工作线程的写操作之间的协调。您如何实现取决于您和您的要求。 CountDownLatch 对作业总数进行倒计时是一种方式(顺便说一句,锁存器使用状态属性,它是一个 volatile int)。 CyclicBarrier也可能是一个值得考虑的构造,特别是如果您想读取一致的中间状态。或 future.get(),或...一切都归结为工作线程必须发出他们已完成写入的信号,以便读取线程可以开始读取。

但是请注意使用 sleep 而不是同步。 sleep 不建立 happens before 关系,使用 sleep 进行同步是典型的并发 bug 模式。 IE。在最坏的情况下, sleep 会在任何工作完成之前执行。

关于java - 如何使对数组的写入对其他线程可见,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/39202361/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com