gpt4 book ai didi

java - 如何在 Reactor 3 中编写运算符?

转载 作者:行者123 更新时间:2023-11-30 06:32:49 24 4
gpt4 key购买 nike

我通过实现 org.reactivestreams.Publisher 来实现 Reactor 运算符,如下所示。然而,我想知道这是否是使用 Reactor 的正确方法。手动实现订阅者看起来有点麻烦。还有Operators类在这方面似乎没有帮助。

class MyOperator implements Publisher<Integer> {

private final Publisher<Integer> source;

public MyOperator(Publisher<Integer> source) {
this.source = source;
}

@Override
public void subscribe(Subscriber<? super Integer> target) {
final Subscriber<Integer> wrappedSubscriber = createSubscriber(target);
source.subscribe(wrappedSubscriber);
}

private Subscriber<Integer> createSubscriber(Subscriber<? super Integer> target) {
return new Subscriber<Integer>() {
private Subscription subscription;

@Override
public void onSubscribe(Subscription subscription) {
this.subscription = subscription;
subscription.request(Long.MAX_VALUE);
}

@Override
public void onNext(Integer integer) {
target.onNext(integer + 1); // actual behaviour
subscription.request(Long.MAX_VALUE);
}

@Override
public void onError(Throwable t) {
target.onError(t);
}

@Override
public void onComplete() {
target.onComplete();
}
};
}
}

或者以下示例是正确的方法™吗?

class MyCompactOperator implements Publisher<Integer> {

final Flux<Integer> flux;

public MyCompactOperator(Publisher<Integer> source) {
flux = Flux.from(source).map(number -> number + 1);
}

@Override
public void subscribe(Subscriber<? super Integer> subscriber) {
flux.subscribe(subscriber);
}
}

至少这需要更少的代码。

按照 Simon Baslé 的建议,变体 3 以 Flux 作为源:

class MyFluxOperator extends Flux<Integer> {

private final Flux<Integer> source;

public MyFluxOperator(Flux<Integer> source) {
this.source = source;
}

@Override
public void subscribe(Subscriber<? super Integer> subscriber) {
source.map(number -> number + 1).subscribe(subscriber);
}
}

所有实现都按预期工作:

Flux<Integer> source = Flux.just(1, 2, 3, 4, 5);
Flux.from(new MyOperator(source)).subscribe(System.out::println);

// for variant 3
new MyFluxOperator(source).subscribe(System.out::println);

我在第二行中使用了 Flux 以避免另一个订阅者的实现。

输出:

2
3
4
5
6

问题:

  • 我的实现中是否缺少某些内容?
  • 是否有更好的方法(使用更少的代码、更好的错误处理或其他方式)在 Reactor 3 中实现运算符?
  • 两种方法之间是否存在相关的功能或非功能差异?

最佳答案

看到你的第二个选项,你似乎认为你必须实现一个发布者。事实绝对不是这样(相反)。从reactor Flux源(或Publisher + Reactor的Flux.from)开始,然后简单地链接到map

编辑:为了澄清您不想创建任何类,只需在主代码路径中执行此操作即可:

  • 如果您的已经是FluxMono:

    Flux<Integer> incrementedSource = source.map(i -> i + 1);
    incrementedSource.subscribe(subscriber);
  • 如果您的来源是另一种发布者:

    Flux<Integer> incrementedSource = Flux.from(source).map(i -> i + 1);
    incrementedSource.subscribe(subscriber);

像 Reactor 这样的库的整体理念是为您提供可以直接组合的运算符,而无需编写 Publisher

如果您想要一种相互化代码的方法,因为您经常将一组运算符应用于各种 Flux,请查看 transformcompose >(以及 reference documentation )。

关于java - 如何在 Reactor 3 中编写运算符?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/45739200/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com