gpt4 book ai didi

java - 如何在Java中创建阻塞线程安全的Stream?

转载 作者:太空宇宙 更新时间:2023-11-04 09:40:00 24 4
gpt4 key购买 nike

我想创建一个在 terminal actions 上阻塞的 java.util.stream.Stream并使用任意对象进行同步。 Stream 的方法本身必须以透明的方式执行此操作,以便我可以安全地将 Stream 传递给不知道同步的代码。

考虑以下示例:

void libraryMethod(Stream<Whatever> s) {
for (int i = 0; i < 10000000; ++i) { /* ... */ }
s.filter(Library::foo).forEach(Library::bar);
}

/* Elsewhere in my code */

Set<Whatever> aSet = Collections.synchronizedSet(...);
/* ... */
libraryMethod(new MyBlockingStream<>(set.stream(), set));

在执行 forEach 之前,我希望 MyBlockingStream 本身获取 aSet 的锁,并仅在 forEach 终止时释放。这应该保证我不会收到 ConcurrentModificationException ,因为其他线程可能想要修改该集合。我无法在整个 libraryMethod 上使用 synchronized (aSet),因为这会阻塞 aSet 的时间比需要的时间长得多。

可以这样做吗?如果是这样,是否有任何现有的实现可以做到这一点,或者我必须自己编写它?

注意:这个问题与 Stream 如何执行操作无关 - 我不在乎它是否并行。我知道存在本质上不可同步的 iterator()spliterator() 方法。我也不关心他们。

最佳答案

你可以使用锁。

public class Example {
public static ReentrantLock lock = new ReentrantLock();

private static void sleep() {
try {
Thread.sleep(200);
} catch (InterruptedException e) {
e.printStackTrace();
}
}

private static Runnable createRunnable() {
return () -> {
try {
Arrays.asList("a", "b", "c").stream().forEach(e -> {
if (!lock.isHeldByCurrentThread())
lock.lock();

sleep();
System.out.println(String.format("thread %s with element %s", Thread.currentThread().getId(), e));
});
} finally {
if(lock.isHeldByCurrentThread())
lock.unlock();
}
};
}

public static void main(String[] args) throws InterruptedException {
Thread t1 = new Thread(createRunnable());
Thread t2 = new Thread(createRunnable());
Thread t3 = new Thread(createRunnable());

t1.start();
t2.start();
t3.start();

System.out.println("join all threads");
t1.join();
t2.join();
t3.join();
}
}

首先到达forEach的线程将锁定所有其他线程。

本例中的输出为

join all threads
thread 16 with element a
thread 16 with element b
thread 16 with element c
thread 15 with element a
thread 15 with element b
thread 15 with element c
thread 14 with element a
thread 14 with element b
thread 14 with element c

编辑1

正如 @Holger 指出的,如果流被嵌套,这将不起作用。内部流会过早释放锁。

关于java - 如何在Java中创建阻塞线程安全的Stream?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/56090665/

24 4 0
文章推荐: html - "position: relative"到底是什么意思?
文章推荐: javascript - 如何仅使用 jquery 制作动态进度条?
文章推荐: jquery - 根据主题更改 css 并删除以前的主题 css
文章推荐: angularjs - Google Chrome HTML