gpt4 book ai didi

Flow SubmissionPublisher 提供方法的 Java 9 行为

转载 作者:搜寻专家 更新时间:2023-10-31 19:36:54 25 4
gpt4 key购买 nike

我一直在玩 Java Flow offer 运算符,但在阅读文档并进行测试后我不明白。

这是我的测试

@Test
public void offer() throws InterruptedException {
//Create Publisher for expected items Strings
SubmissionPublisher<String> publisher = new SubmissionPublisher<>();
//Register Subscriber
publisher.subscribe(new CustomSubscriber<>());
publisher.subscribe(new CustomSubscriber<>());
publisher.subscribe(new CustomSubscriber<>());
publisher.offer("item", (subscriber, value) -> false);
Thread.sleep(500);
}

offer 运算符接收要发出的项目和 BiPredicate 函数,据我阅读文档的理解,只有在谓词函数为真时才会发出项目。

Bur通过测试后的结果是

Subscription done:
Subscription done:
Subscription done:
Got : item --> onNext() callback
Got : item --> onNext() callback
Got : item --> onNext() callback

如果我返回 true 而不是 false,结果没有变化。

任何人都可以向我解释这个运算符好一点。

最佳答案

不,谓词函数用于决定是否重试 docs 中提到的发布操作:

onDrop - if non-null, the handler invoked upon a drop to a subscriber, with arguments of the subscriber and item; if it returns true, an offer is re-attempted (once)

它不影响是否最初发送该项目。

编辑:使用 offer 方法时如何发生掉落的示例

我提出了一个示例,说明在调用 offer 方法时如何发生掉落。我不认为输出是 100% 确定性的,但是当它运行几次时会有明显的区别。您可以将处理程序更改为返回 true 而不是 false,以查看重试如何减少由于缓冲区饱和而导致的丢弃。在此示例中,丢弃通常会发生,因为最大缓冲区容量明显很小(传递给 SubmissionPublisher 的构造函数)。但是,当在一小段 hibernate 期后启用重试时,掉落将被移除:

public class SubmissionPubliserDropTest {

public static void main(String[] args) throws InterruptedException {
// Create Publisher for expected items Strings
// Note the small buffer max capacity to be able to cause drops
SubmissionPublisher<String> publisher =
new SubmissionPublisher<>(ForkJoinPool.commonPool(), 2);
// Register Subscriber
publisher.subscribe(new CustomSubscriber<>());
publisher.subscribe(new CustomSubscriber<>());
publisher.subscribe(new CustomSubscriber<>());
// publish 3 items for each subscriber
for(int i = 0; i < 3; i++) {
int result = publisher.offer("item" + i, (subscriber, value) -> {
// sleep for a small period before deciding whether to retry or not
try {
Thread.sleep(200);
} catch (InterruptedException e) {
e.printStackTrace();
}
return false; // you can switch to true to see that drops are reduced
});
// show the number of dropped items
if(result < 0) {
System.err.println("dropped: " + result);
}
}
Thread.sleep(3000);
publisher.close();
}
}

class CustomSubscriber<T> implements Flow.Subscriber<T> {

private Subscription sub;

@Override
public void onComplete() {
System.out.println("onComplete");
}

@Override
public void onError(Throwable th) {
th.printStackTrace();
sub.cancel();
}

@Override
public void onNext(T arg0) {
System.out.println("Got : " + arg0 + " --> onNext() callback");
sub.request(1);
}

@Override
public void onSubscribe(Subscription sub) {
System.out.println("Subscription done");
this.sub = sub;
sub.request(1);
}

}

关于Flow SubmissionPublisher 提供方法的 Java 9 行为,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/46502183/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com