gpt4 book ai didi

MapUntil 的 Java 8 Fork 流

转载 作者:塔克拉玛干 更新时间:2023-11-02 08:38:15 25 4
gpt4 key购买 nike

我最近开始玩 java 流。现在我想到了

Stream<T> mapUntil(Stream<T> in, Function<T,T> mapFunc, Predicate<Stream<T>> predicate)

或更一般的

Stream<T> applyUntil(Stream<T> in, Function<Stream<T>,Stream<T>> func,
Predicate<Stream<T>> predicate)

和他们天真的实现

Stream<T> mapUntil(Stream<T> in, Function<T,T> mapFunc, Predicate<Stream<T>> predicate){
return applyUntil(in,in->in.map(mapFunc),predicate)
}
Stream<T> applyUntil(Stream<T> in, Function<Stream<T>,Stream<T>> func,
Predicate<Stream<T>> predicate){
if(predicate.test(in)) return in;
return applyUntil(func.apply(in),func,predicate);
}

遗憾的是,mapUntil(stream,mapFunc,s->s.anyMatch(predicate)) 导致 IllegalStateException: Stream has already been operationed on or closed,这是合乎逻辑,因为我在同一流上调用了 anyMatchmap。所以我为 applyUntil 想出了一个不同的实现:

Stream<T> applyUntil(Stream<T> in, Function<Stream<T>,Stream<T>> func,
Predicate<Stream<T>> predicate){
List<T> collected = in.collect(Collectors.toList());
if(predicate.test(collected.stream())) return collected.stream()
return applyUntil(func.apply(collected.stream(),func,predicate);
}

这显然有很多问题。

  • 它不适用于无限(或非常大)的流。 在我的特殊情况下,这是可以接受的,但是对于这样一个通用的方法来说,声称这一点感觉很糟糕
  • 它违背了流的意图,因为所有的惰性都消失了——由于 collect(Collectors.toList())
  • ,每个数据都必须计算和存储

我试图修改我的代码以消除第二个问题,重写applyUntil:

Stream<T> applyUntil(Stream<T> in, Function<Stream<T>,Stream<T>> func,
Predicate<Stream<T>> predicate){
List<T> collected = in.collect(Collectors.toList());
return applyUntil(()->collected.stream(),func,predicate);
}
Stream<T> applyUntil(Supplier<Stream<T>> sup, Function<Stream<T>,Stream<T>> func,
Predicate<Stream<T>> pred){
if(predicate.test(sup.get())) return sup.get();
return applyUntil(()->func.apply(sup.get()),func,predicate);
}

这个实现确实有效 - 但速度非常慢,尤其是当你有一个非常昂贵的功能时。当我仔细观察它时,我明白了原因:它正在调用 predicate.test(collected.stream()),predicate.test(func.apply(collected.stream()))predicate.test(func.apply(func.apply(collected.stream())) 等等,导致 O(n^2) 调用func,与 n 需要的调用相比。这不好。

在我天真的世界里,应该有比这两者更好的解决方案。类似这样的东西(只是一个快速草稿,AddFirst - Exists 是 MyStream 的简单惰性实现。我在默认 java Streams 的代码末尾错过了像我的 Fork 这样的类):

interface MyStream<T>{
T get();
boolean hasNext();
}
class Convert<T> implements MyStream<T>{
Iterator<T> inner;
pulic Convert(Interator<T> iter){
inner=iter;
}
public boolean hasNext(){
return inner.hasNext();
}
public T get(){
return inner.get();
}
class AddFirst<T> implements MyStream<T>{
T item;
MyStream<T> inner;
boolean used;
public AddFirst(T t, MyStream<T> prev){
item=t;
inner=prev;
used=false;
}
public T get(){
if(used) return inner.get();
used=true;
return item;
}
public boolean hasNext(){
return !used || inner.hasNext();
}

}
class Filter<T> implements MyStream<T>{
Predicate<T> filter;
MyStream<T> inner
public Filter(Predicate<T> test, MyStream<T> prev){
filter=test;
inner=prev;
}
public T get(){
while(true){
T curr = inner.get(); //if !inner.hasNext, this throws NoSuchElementException
if(filter.test(curr)) return curr;
}
}
public boolean hasNext(){
try{
T item = get();
inner = new AddFirst(item,inner);
return true;
}
catch(NoSuchElementException e){
return false;
}
}
}
class Map<K,T> implements MyStream<T>{
MyStream<K> inner;
Function<K,T> func;
public Map(Function<K,T> func,MyStream<K> prev){
this.func=func;
inner = prev;
}
public T get(){
return func.apply(inner.get());
}
public boolean hasNext(){
return inner.hasNext();
}
}
class Forall<T> implements Predicate<MyStream<T>>{
Predicate<T> pred;
public Forall(Predicate<T> func){
pred=func;
}
public boolean test(MyStream<T> ms){
while(ms.hasNext()){
if(!pred.test(ms.get()) return false;
}
return true;
}
}
class Exists<T> implements Predicate<MyStream<T>>{
Predicate<T> pred;
public Forall(Predicate<T> func){
pred=func;
}
public boolean test(MyStream<T> ms){
while(ms.hasNext()){
if(pred.test(ms.get()) return true;
}
return false;
}
}
class Fork<T>{
Deque<T> advance;
MyStream<T> inner;
boolean ahead;
MyStream<T> master;
MyStream<T> slave;
public Fork(MyStrem<T> prev){
inner=prev;
advance= new LinkedList<T>();
ahead=false;
master = new ForkStream(true);
slave = new ForkStream(false);
}
public MyStream<T> getMaster(){
return master;
}
public Iterator<T> getMasterIter(){
return master;
}
public MyStream<T> getSlave(){
return slave;
}
public Iterator<T> getSlaveIter(){
return slave;
}
class ForkStream implements MyStream<T>, Iterator<T>{
boolean role;
public ForkStream(boolean in){
role=in;
}
public T get(){
if(role==ahead||advance.size()==0){
ahead=role;
T item = inner.get();
advance.addLast(item);
return item;
}
else{
return advance.removeFirst();
}
}
public boolean hasNext(){
return (role!=ahead&&advance.size()!=0) || inner.hasNext();
}
public T next(){
return get();
}
}
}

有了这些类,我可以将我的方法重写为:

Stream<T> applyUntil(Stream<T> in, Function<Stream<T>,Stream<T>> func,
Predicate<Stream<T>> predicate){
Fork<T> fork = new Fork(new Convert<T>(in.iterator()));
Stream<T> master = StreamSupport.stream(
Spliterators.spliteratorUnknownSize(fork.getMasterIter(),0),false);
Stream<T> slave = StreamSupport.stream(
Spliterators.spliteratorUnknownSize(fork.getSlaveIter(),0),false);
if(predicate.test(master)) return slave
return applyUntil(func.apply(slave,func,predicate);
}

它确实适用于无限流,它仍然是惰性的,它会重用计算值。在我看来,这就像适合各种用途的一体化设备。

编辑: 当我试图解释为什么最后一个代码块无法编译时,我找到了一种使其编译的方法。它仍然不是很好,失去了很多流魔法,不是线程保存等。此外,MyStream 应该有一个 close 方法,如果您不再对任何数据感兴趣 - 所以 Fork 不必为您保存。所以另一个问题出现在我的脑海中:你能从类似于迭代器的东西创建一个 j.u.stream.Stream 来告诉它的迭代器它不再对任何数据感兴趣(因为短路)吗?

所以我的问题是:没有外部库的 JDK8 是否有像我的 Fork 一样的东西,保持更多的魔力?如果是:哪个类/方法可以帮助我?如果否:为什么不呢?并且:你怎么能自己实现它,让尽可能多的魔法保持活力?

感谢阅读,对于这么长的文字感到抱歉:/
亚历克斯

最佳答案

Java 9 将有 takeWhiledropWhile .将这些与 Stream.concat 结合起来你可以做到

Stream.concat(
sourceCollection.stream().takeWhile(predicate).map(mapper),
sourceCollection.stream().dropWhile(predicate.negate())
)

这不会为中间集合消耗额外的内存,但会通过两次遍历前缀来消耗 CPU 时间,除非它可以在到达第二个流之前短路。

为了获得更高效的解决方案,您可以通过提取 stream.spliterator() 来实现中间有状态操作,例如您的条件映射函数。 ,将其包装到 Spliterator 的自定义子类中- 或 j.u.Spliterators.AbstractSpliterator如果你懒于实现并行支持 - 然后使用 j.u.s.StreamSupport.stream(Spliterator<T>, boolean)将其包装回流中。

关于MapUntil 的 Java 8 Fork 流,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/37891144/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com