gpt4 book ai didi

java - 来自套接字的 Spark Streaming 不适用于reduce 操作

转载 作者:行者123 更新时间:2023-12-02 06:55:56 25 4
gpt4 key购买 nike

我正在尝试在我的本地计算机上运行一个简单的 Spark-Streaming 示例。
我有一个线程将 As/Bs/Cs 写入套接字:

serverSocket = new ServerSocket(Constants.PORT);
s1 = serverSocket.accept();
while(true) {
Thread.sleep(random.nextInt(100));
String character = alphabet.get(random.nextInt(alphabet.size())) ;
PrintWriter out = new PrintWriter(s1.getOutputStream());
out.println(character);
out.flush();
}

我的主程序,我尝试计算 As/Bs/C 的数量,如下所示(没有减少步骤):

public static void main(String[] args) {
// start socket writer thread
System.setProperty("spark.cleaner.ttl", "10000");
JavaSparkContext sc = new JavaSparkContext(
"local",
"Test",
Constants.SPARK_HOME,
new String[]{"target/spark-standalone-0.0.1-SNAPSHOT.jar"});
Duration batchDuration = new Duration(TIME_WINDOW_MS);
JavaStreamingContext streamingContext = new JavaStreamingContext(sc, batchDuration);
JavaDStream<String> stream = streamingContext.socketTextStream("localhost", Constants.PORT);
stream.print();
JavaPairDStream<String, Long> texts = stream.map(new PairFunction<String, String, Long>() {

@Override
public Tuple2<String, Long> call(String t) throws Exception {
return new Tuple2<String, Long>("batchCount" + t, 1l);
}

});
texts.print();
streamingContext.checkpoint("checkPointDir");
streamingContext.start();

在这种情况下,一切正常(批处理的示例输出):

Time: 1372413296000 ms
-------------------------------------------
B
A
B
C
C
C
A
B
C
C
...

-------------------------------------------
Time: 1372413296000 ms
-------------------------------------------
(batchCountB,1)
(batchCountA,1)
(batchCountB,1)
(batchCountC,1)
(batchCountC,1)
(batchCountC,1)
(batchCountA,1)
(batchCountB,1)
(batchCountC,1)
(batchCountC,1)
...

但是如果我在 map 之后添加缩减步骤,它就不再起作用了。此代码位于texts.print()之后

JavaPairDStream<String, Long> reduced = texts.reduceByKeyAndWindow(new Function2<Long, Long, Long>() {

@Override
public Long call(Long t1, Long t2) throws Exception {
return t1 + t2;
}
}, new Duration(TIME_WINDOW_MS));
reduced.print();

在这种情况下,我只得到第一个“stream”变量和“texts”变量的输出,而没有得到任何reduce 的输出。在第一个批处理之后也没有任何反应。我还将 Spark 日志级别设置为 DEBUG 但没有遇到任何异常或其他奇怪的事情。

这里发生了什么?为什么我会被锁定?

最佳答案

仅供记录:我在 Spark 用户组中得到了答案。
错误是必须使用

"local[2]"

而不是

"local"

作为参数来实例化 Spark 上下文,以启用并发处理。

关于java - 来自套接字的 Spark Streaming 不适用于reduce 操作,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/17362233/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com