gpt4 book ai didi

java - Scala vs Java Streaming : Scala prints nothing, Java 工作

转载 作者:搜寻专家 更新时间:2023-11-01 03:34:32 26 4
gpt4 key购买 nike

我正在使用 akka-stream 对 Scala 与 Java Reactive Spec 实现进行比较和 RxJava , 分别。我的用例是一个简单的 grep:给定一个目录、一个文件过滤器和一个搜索文本,我在该目录中查找所有包含该文本的匹配文件。然后我流式传输 (filename -> matching line) 对。这适用于 Java,但对于 Scala,什么也没有打印。也不异常(exception),但也没有输出。测试数据是从互联网上下载的,但正如您所见,代码也可以很容易地在任何本地目录下进行测试。

斯卡拉:

object Transformer {
implicit val system = ActorSystem("transformer")
implicit val materializer = ActorMaterializer()
implicit val executionContext: ExecutionContext = {
implicitly
}

import collection.JavaConverters._

def run(path: String, text: String, fileFilter: String) = {
Source.fromIterator { () =>
Files.newDirectoryStream(Paths.get(path), fileFilter).iterator().asScala
}.map(p => {
val lines = io.Source.fromFile(p.toFile).getLines().filter(_.contains(text)).map(_.trim).to[ImmutableList]
(p, lines)
})
.runWith(Sink.foreach(e => println(s"${e._1} -> ${e._2}")))
}
}

Java:

public class Transformer {
public static void run(String path, String text, String fileFilter) {
Observable.from(files(path, fileFilter)).flatMap(p -> {
try {
return Observable.from((Iterable<Map.Entry<String, List<String>>>) Files.lines(p)
.filter(line -> line.contains(text))
.map(String::trim)
.collect(collectingAndThen(groupingBy(pp -> p.toAbsolutePath().toString()), Map::entrySet)));
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}).toBlocking().forEach(e -> System.out.printf("%s -> %s.%n", e.getKey(), e.getValue()));
}

private static Iterable<Path> files(String path, String fileFilter) {
try {
return Files.newDirectoryStream(Paths.get(path), fileFilter);
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}
}

单元测试使用 Scala 测试:

class TransformerSpec extends FlatSpec with Matchers {
"Transformer" should "extract temperature" in {
Transformer.run(NoaaClient.currentConditionsPath(), "temp_f", "*.xml")
}

"Java Transformer" should "extract temperature" in {
JavaTransformer.run(JavaNoaaClient.currentConditionsPath(false), "temp_f", "*.xml")
}
}

最佳答案

该死,我忘了 Source 返回一个 Future,这意味着流从未运行过。 @MrWiggles 的评论给了我一个提示。以下 Scala 代码产生与 Java 版本相同的结果。

注意:我问题中的代码没有关闭 DirectoryStream,对于包含大量文件的目录,它会导致 java.io. IOException:系统中打开的文件过多。下面的代码正确地关闭了资源。

def run(path: String, text: String, fileFilter: String) = {
val files = Files.newDirectoryStream(Paths.get(path), fileFilter)

val future = Source(files.asScala.toList).map(p => {
val lines = io.Source.fromFile(p.toFile).getLines().filter(_.contains(text)).map(_.trim).to[ImmutableList]
(p, lines)
})
.filter(!_._2.isEmpty)
.runWith(Sink.foreach(e => println(s"${e._1} -> ${e._2}")))

Await.result(future, 10.seconds)

files.close

true // for testing
}

关于java - Scala vs Java Streaming : Scala prints nothing, Java 工作,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/35221374/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com