gpt4 book ai didi

java - 使用通过 Flow API 实现的处理器转换数据流

转载 作者:塔克拉玛干 更新时间:2023-11-03 03:54:11 26 4
gpt4 key购买 nike

我正在浏览 Community#DOC-1006738来自 Oracle 的有关 Flow.Publisher 的并发概念和 Flow.Subscriber .上面可以找到Sample code to transform data stream using processor有这两行代码,让我有点疑惑。

//Create Processor and Subscriber  
MyFilterProcessor<String, String> filterProcessor =
new MyFilterProcessor<>(s -> s.equals("x"));

问题 1. MyFilterProcessor 怎么可能是 <String, String> 类型?这里?

我最初的想法是,这些可能是 <String, Boolean>相反,但这将违背下一行中订阅者定义的进一步定义:-

MyTransformProcessor<String, Integer> transformProcessor = 
new MyTransformProcessor<>(s -> Integer.parseInt(s));

补充说明,除非我明确地将以上内容转换(更正)为

MyTransformProcessor<String, Integer>(s -> Integer.parseInt(s))

我在 parseInt 中遇到错误 阅读,不能应用于Object

-- 为什么我需要在此处显式转换 RHS? --


虽然代码主要出现在共享链接中,但我使用的有用的构造函数定义是

public class MyTransformProcessor<T, R> extends SubmissionPublisher<R> implements Flow.Processor<T, R> {       
private Function function;
MyTransformProcessor(Function<? super T, ? extends R> function) {
super();
this.function = function;
}
...
}

filterProcessor 相同的一个作为:-

public class MyFilterProcessor<T, R> extends SubmissionPublisher<R> implements Flow.Processor<T, R> {
private Function function;
MyFilterProcessor(Function<? super T, ? extends R> function) {
super();
this.function = function;
}
...
}

问题。现在有了这些更改(一个在解决问题 1 之后,另一个来自附加注释),如何才能正确实现示例?或者我只是错过了一些非常基本的东西?

最佳答案

我认为您的主要错误是实现 MyFilterProcessor作为 MyTransformProcessor 的(几乎)精确副本.

由于作者没有贴出上述类的代码,我尝试根据以下内容猜测其行为:

... = new MyFilterProcessor<>(s -> s.equals("x"));

名字Filter建议该组件仅接受并重新发布某些值。此时计算结果为 boolean 的函数(或 Predicate<T> )在上下文中是完全可以接受的(因此 s -> s.equals("x") )。

页面末尾的初始数据流

String[] items = {"1", "x", "2", "x", "3", "x"};  

似乎证实了我的假设。作者只是想过滤掉"x"值,这个任务交给了MyFilterProcessor在将其发布到管道的其余部分之前必须评估每种类型;并且输出类型必须与输入类型相同。


构造函数应该如下所示:

MyFilterProcessor(Predicate<? super R> predicate) { /* ... */ }
// or
MyFilterProcessor(Function<? super R, Boolean> function) { /* ... */ }

onNext应该只转发某些元素:

if (! predicate.test(item)) {
int max = submit(item); // get the estimated maximum lag
subscription.request(max);
}

对于 MyFilterProcessor 的定义我有两个想法:

  • 1) public class MyFilterProcessor<T, R> extends SubmissionPublisher<R> implements Flow.Processor<R, R>

作为Flow.Processor意味着接受和转发相同的类型。

我就是不适合这个类型 T任何地方。这就是我被阻止的地方。

  • 2) public class MyFilterProcessor<T, R> extends SubmissionPublisher<R> implements Flow.Processor<T, R>

然后,在 onNext , 你必须投 <T><R> (丑,非常丑)

if (! predicate.test(item)) {
int max = submit( (R) item);
subscription.request(max);
}

您将测试 Predicate<? super T>在这种情况下。

如果你愿意重构一点,如SubmissionPublisher已经继承了 Flow.Publisher 的行为你可以让类只实现 Flow.Subscriber :

public class MyFilterProcessor<R> extends SubmissionPublisher<R> implements Flow.Subscriber<R>

等等

MyFilterProcessor<String, String> filterProcessor = new MyFilterProcessor<>(s -> s.equals("x"));
// or, if you follow my example:
MyFilterProcessor<String> filterProcessor = new MyFilterProcessor<>(s -> s.equals("x"));

终于成功了。


如果打印 MyFilterProcessor 内的值和 MySubscriber你会得到这个输出:

Publishing Items...
FilterProcessor: Receiving: 1
FilterProcessor: Receiving: x
FilterProcessor: Receiving: 2
FilterProcessor: Receiving: x
FilterProcessor: Receiving: 3
FilterProcessor: Receiving: x
Got: 1
Got: 2
Got: 3

这是预期的结果。


测试时,记得等待管道完成后再退出应用程序 SubmissionPublisher在另一个 Thread 中发布元素.

另外,请与文章相反,有常识去改

private Function function; 
// ...
submit((R) function.apply(item));

private Function<? super T, ? extends R> function;
// ...
submit(function.apply(item));

Why do I need to explicitly cast the RHS here?

我仍在努力理解你是如何得到 cannot be applied to Object 的错误。哪个您使用的编号和 IDE?

关于java - 使用通过 Flow API 实现的处理器转换数据流,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/48977079/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com