gpt4 book ai didi

java - 在 java 9 Flow 上以一种只有一个订阅者会使用它的方式将数据发布给订阅者

转载 作者:搜寻专家 更新时间:2023-11-01 02:36:23 24 4
gpt4 key购买 nike

有没有办法以只有一个订阅者可以接收的方式向订阅者发布数据?我想要实现的是,订阅者发布者模型将作为一个队列工作,该队列具有多个读者但只有一个发布者。一旦发布者发布数据,第一个接收数据的订阅者将是唯一一个处理数据的订阅者。

提前致谢!!!

最佳答案

在 react 流中(至少,在它们的 java.util.concurrent.Flow 化身中),订阅者只需要数据,只有发布者可以控制如何发布该数据。

Java 9 中Flow.Publisher 的唯一通用实现是 SubmissionPublisher,它遵循标准的发布/订阅方式向所有订阅者发布任何已发布的项目.我没有找到任何简单的方法来破解 SubmissionPublisher 以使其只发布给一个订阅者。

但是您可以尝试编写自己的 Flow.Publisher 实现,如下所示:

class QueueLikePublisher<T> implements Publisher<T> {
private final ExecutorService executor = ForkJoinPool.commonPool(); // daemon-based
private List<QueueLikeSubscription<? super T>> subscriptions = new CopyOnWriteArrayList<>();

public synchronized void subscribe(Subscriber<? super T> subscriber) {
// subscribing: adding a new subscription to the list
QueueLikeSubscription<? super T> subscription = new QueueLikeSubscription<>(subscriber, executor);
subscriptions.add(subscription);
subscriber.onSubscribe(subscription);
}

public void submit(T item) {
// we got some data: looking for non-completed and demanding
// subscription and give it the data item

for (QueueLikeSubscription<? super T> subscription : subscriptions) {
if (!subscription.completed && subscription.demand > 0) {
subscription.offer(item);
// we just give it to one subscriber; probaly offer() call needs
// to be wrapped in a try/catch
break;
}
}
}

static class QueueLikeSubscription<T> implements Subscription {
private final Subscriber<? super T> subscriber;
private final ExecutorService executor;
volatile int demand = 0;
volatile boolean completed = false;

QueueLikeSubscription(Subscriber<? super T> subscriber,
ExecutorService executor) {
this.subscriber = subscriber;
this.executor = executor;
}

public synchronized void request(long n) {
if (n != 0 && !completed) {
if (n < 0) {
IllegalArgumentException ex = new IllegalArgumentException();
executor.execute(() -> subscriber.onError(ex));
} else {
// just extending the demand
demand += n;
}
}
}

public synchronized void cancel() {
completed = true;
}

Future<?> offer(T item) {
return executor.submit(() -> {
try {
subscriber.onNext(item);
} catch (Exception e) {
subscriber.onError(e);
}
});
}
}
}

它将尚未完成(例如,已取消)且需求非零的项目发布给第一个订阅者。

请注意,此代码只是用于说明想法的大纲。例如,它可能应该包含更多异常处理(例如处理 RejectedExecutionException)。

关于java - 在 java 9 Flow 上以一种只有一个订阅者会使用它的方式将数据发布给订阅者,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/50112809/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com