gpt4 book ai didi

java - 为什么当我发送两个输入流时 Spark Streaming 停止工作?

转载 作者:行者123 更新时间:2023-11-30 02:56:29 25 4
gpt4 key购买 nike

我正在开发一个 Spark Streaming 应用程序,其中需要使用来自 Python 中的两个服务器的输入流,每个服务器每秒向 Spark 上下文发送一条 JSON 消息。

我的问题是,如果我只对一个流执行操作,一切都会正常。但是,如果我有来自不同服务器的两个流,那么 Spark 在打印任何内容之前就会卡住,并且只有当两个服务器都发送了它们必须发送的所有 JSON 消息时(当它检测到“socketTextStream 没有接收到”时)才重新开始工作数据。

这是我的代码:

    JavaReceiverInputDStream<String> streamData1 = ssc.socketTextStream("localhost",996,
StorageLevels.MEMORY_AND_DISK_SER);

JavaReceiverInputDStream<String> streamData2 = ssc.socketTextStream("localhost", 9995,StorageLevels.MEMORY_AND_DISK_SER);

JavaPairDStream<Integer, String> dataStream1= streamData1.mapToPair(new PairFunction<String, Integer, String>() {
public Tuple2<Integer, String> call(String stream) throws Exception {


Tuple2<Integer,String> streamPair= new Tuple2<Integer, String>(1, stream);

return streamPair;
}
});

JavaPairDStream<Integer, String> dataStream2= streamData2.mapToPair(new PairFunction<String, Integer, String>() {
public Tuple2<Integer, String> call(String stream) throws Exception {


Tuple2<Integer,String> streamPair= new Tuple2<Integer, String>(2, stream);

return streamPair;
}
});

dataStream2.print(); //for example

请注意,没有错误消息,Spark 在启动上下文后简单地卡住,当我从端口获取 JSON 消息时,它没有显示任何内容。

非常感谢。

最佳答案

看看 Spark Streaming documentation 中的这些警告 看看它们是否适用:

Points to remember

  • When running a Spark Streaming program locally, do not use “local” or “local1” as the master URL. Either of these means that only one thread will be used for running tasks locally. If you are using a input DStream based on a receiver (e.g. sockets, Kafka, Flume, etc.), then the single thread will be used to run the receiver, leaving no thread for processing the received data. Hence, when running locally, always use “local[n]” as the master URL, where n > number of receivers to run (see Spark Properties for information on how to set the master).
  • Extending the logic to running on a cluster, the number of cores allocated to the Spark Streaming application must be more than the number of receivers. Otherwise the system will receive data, but not be able to process it.

关于java - 为什么当我发送两个输入流时 Spark Streaming 停止工作?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/37116079/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com