gpt4 book ai didi

java - 如何让 Spark Streaming 在单元测试中计算文件中的单词数?

转载 作者:搜寻专家 更新时间:2023-10-31 20:27:53 24 4
gpt4 key购买 nike

我已经成功地用 Java 构建了一个基于 HdfsCount example in Scala 的非常简单的 Spark Streaming 应用程序.

当我将此应用程序提交到本地 Spark 时,它会等待将文件写入给定目录,当我创建该文件时,它会成功打印字数。我按 Ctrl+C 终止应用程序。

现在我尝试为此功能创建一个非常基本的单元测试,但在测试中我无法打印相同的信息,即字数。

我错过了什么?

下面是单元测试文件,之后我还包含了显示 countWords 方法的代码片段:

StarterAppTest.java

import com.google.common.io.Files;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;


import org.junit.*;

import java.io.*;

public class StarterAppTest {

JavaStreamingContext ssc;
File tempDir;

@Before
public void setUp() {
ssc = new JavaStreamingContext("local", "test", new Duration(3000));
tempDir = Files.createTempDir();
tempDir.deleteOnExit();
}

@After
public void tearDown() {
ssc.stop();
ssc = null;
}

@Test
public void testInitialization() {
Assert.assertNotNull(ssc.sc());
}


@Test
public void testCountWords() {

StarterApp starterApp = new StarterApp();

try {
JavaDStream<String> lines = ssc.textFileStream(tempDir.getAbsolutePath());
JavaPairDStream<String, Integer> wordCounts = starterApp.countWords(lines);

ssc.start();

File tmpFile = new File(tempDir.getAbsolutePath(), "tmp.txt");
PrintWriter writer = new PrintWriter(tmpFile, "UTF-8");
writer.println("8-Dec-2014: Emre Emre Emre Ergin Ergin Ergin");
writer.close();

System.err.println("===== Word Counts =======");
wordCounts.print();
System.err.println("===== Word Counts =======");

} catch (FileNotFoundException e) {
e.printStackTrace();
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}


Assert.assertTrue(true);

}

}

此测试编译并开始运行,Spark Streaming 在控制台上打印了很多诊断消息,但对 wordCounts.print() 的调用没有打印任何内容,而在 StarterApp.java 本身中,他们这样做。

我也尝试在 ssc.start() 之后添加 ssc.awaitTermination(); 但在这方面没有任何改变。之后,我还尝试在此 Spark Streaming 应用程序正在检查的目录中手动创建一个新文件,但这次它给出了一个错误。

为了完整起见,下面是 wordCounts 方法:

public JavaPairDStream<String, Integer> countWords(JavaDStream<String> lines) {
JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
@Override
public Iterable<String> call(String x) { return Lists.newArrayList(SPACE.split(x)); }
});

JavaPairDStream<String, Integer> wordCounts = words.mapToPair(
new PairFunction<String, String, Integer>() {
@Override
public Tuple2<String, Integer> call(String s) { return new Tuple2<>(s, 1); }
}).reduceByKey((i1, i2) -> i1 + i2);

return wordCounts;
}

最佳答案

几点建议:

  • 为 SparkStreaming 上下文提供至少 2 个核心。 1 个用于 Streaming,1 个用于 Spark 处理。 “本地”->“本地[2]”
  • 您的流式传输间隔为 3000 毫秒,因此您需要在程序中的某处等待 - 至少 - 那个时间才能期待输出。
  • Spark Streaming 需要一些时间来设置监听器。在发出 ssc.start 后立即创建该文件。不保证文件系统监听器已经到位。我会在 ssc.start
  • 之后做一些 sleep(xx)

在流媒体中,一切都与正确的时机有关。

关于java - 如何让 Spark Streaming 在单元测试中计算文件中的单词数?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/27356157/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com