- Java 双重比较
- java - 比较器与 Apache BeanComparator
- Objective-C 完成 block 导致额外的方法调用?
- database - RESTful URI 是否应该公开数据库主键?
我一直在玩 Java Flow offer
运算符,但在阅读文档并进行测试后我不明白。
这是我的测试
@Test
public void offer() throws InterruptedException {
//Create Publisher for expected items Strings
SubmissionPublisher<String> publisher = new SubmissionPublisher<>();
//Register Subscriber
publisher.subscribe(new CustomSubscriber<>());
publisher.subscribe(new CustomSubscriber<>());
publisher.subscribe(new CustomSubscriber<>());
publisher.offer("item", (subscriber, value) -> false);
Thread.sleep(500);
}
offer 运算符接收要发出的项目和 BiPredicate 函数,据我阅读文档的理解,只有在谓词函数为真时才会发出项目。
Bur通过测试后的结果是
Subscription done:
Subscription done:
Subscription done:
Got : item --> onNext() callback
Got : item --> onNext() callback
Got : item --> onNext() callback
如果我返回 true 而不是 false,结果没有变化。
任何人都可以向我解释这个运算符好一点。
最佳答案
不,谓词函数用于决定是否重试 docs 中提到的发布操作:
onDrop
- if non-null, the handler invoked upon a drop to a subscriber, with arguments of the subscriber and item; if it returns true, an offer is re-attempted (once)
它不影响是否最初发送该项目。
编辑:使用 offer
方法时如何发生掉落的示例
我提出了一个示例,说明在调用 offer
方法时如何发生掉落。我不认为输出是 100% 确定性的,但是当它运行几次时会有明显的区别。您可以将处理程序更改为返回 true 而不是 false,以查看重试如何减少由于缓冲区饱和而导致的丢弃。在此示例中,丢弃通常会发生,因为最大缓冲区容量明显很小(传递给 SubmissionPublisher
的构造函数)。但是,当在一小段 hibernate 期后启用重试时,掉落将被移除:
public class SubmissionPubliserDropTest {
public static void main(String[] args) throws InterruptedException {
// Create Publisher for expected items Strings
// Note the small buffer max capacity to be able to cause drops
SubmissionPublisher<String> publisher =
new SubmissionPublisher<>(ForkJoinPool.commonPool(), 2);
// Register Subscriber
publisher.subscribe(new CustomSubscriber<>());
publisher.subscribe(new CustomSubscriber<>());
publisher.subscribe(new CustomSubscriber<>());
// publish 3 items for each subscriber
for(int i = 0; i < 3; i++) {
int result = publisher.offer("item" + i, (subscriber, value) -> {
// sleep for a small period before deciding whether to retry or not
try {
Thread.sleep(200);
} catch (InterruptedException e) {
e.printStackTrace();
}
return false; // you can switch to true to see that drops are reduced
});
// show the number of dropped items
if(result < 0) {
System.err.println("dropped: " + result);
}
}
Thread.sleep(3000);
publisher.close();
}
}
class CustomSubscriber<T> implements Flow.Subscriber<T> {
private Subscription sub;
@Override
public void onComplete() {
System.out.println("onComplete");
}
@Override
public void onError(Throwable th) {
th.printStackTrace();
sub.cancel();
}
@Override
public void onNext(T arg0) {
System.out.println("Got : " + arg0 + " --> onNext() callback");
sub.request(1);
}
@Override
public void onSubscribe(Subscription sub) {
System.out.println("Subscription done");
this.sub = sub;
sub.request(1);
}
}
关于Flow SubmissionPublisher 提供方法的 Java 9 行为,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/46502183/
每隔一段时间我都会用特定的查询检索推文。这些推文必须传递给计算和操作这些推文的服务。所以这些服务订阅了我的出版商。所以 publisher.hasSubscribers() 返回 true。但是提交或
我一直在玩 Java Flow offer 运算符,但在阅读文档并进行测试后我不明白。 这是我的测试 @Test public void offer() throws InterruptedExcep
我正在尝试 Java 9 中的一些新功能。所以我进行了一个测试,以拥有一个发布者,以给定的速率发布数字。我还实现了一个订阅者来收听这些出版物并将它们打印到控制台。 虽然我可能不完全理解如何使用这个 A
我是一名优秀的程序员,十分优秀!