gpt4 book ai didi

apache-spark - Apache Spark 流简单应用程序不起作用

转载 作者:行者123 更新时间:2023-12-04 04:21:32 26 4
gpt4 key购买 nike

我在 Apache Spark Streaming 库上有以下问题。我已经重写了一个简单的“字数统计”独立应用程序来查看流媒体的工作原理,因此代码如下:

    SparkConf conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount");
JavaStreamingContext jssc = new JavaStreamingContext(conf, new Duration(1000));

// Create a DStream that will connect to hostname:port, like localhost:9999
JavaReceiverInputDStream<String> lines = jssc.socketTextStream("localhost", 9999);

// Split each line into words
JavaDStream<String> words = lines.flatMap(
new FlatMapFunction<String, String>() {
@Override public Iterable<String> call(String x) {
return Arrays.asList(x.split(" "));
}
});

// Count each word in each batch
JavaPairDStream<String, Integer> pairs = words.mapToPair(
new PairFunction<String, String, Integer>() {
@Override public Tuple2<String, Integer> call(String s) throws Exception {
return new Tuple2<String, Integer>(s, 1);
}
});



JavaPairDStream<String, Integer> wordCounts = pairs.reduceByKey(
new Function2<Integer, Integer, Integer>() {
@Override public Integer call(Integer i1, Integer i2) throws Exception {
return i1 + i2;
}
});

// Print the first ten elements of each RDD generated in this DStream to the console
wordCounts.print();

jssc.start(); // Start the computation
jssc.awaitTermination(); // Wait for the computation to terminate

当我运行这个独立的应用程序时,日志会循环以下几行:
14/10/08 13:16:44 INFO JobScheduler: Finished job streaming job 1412767004000 ms.0 from job set of time 1412767004000 ms
14/10/08 13:16:44 INFO JobScheduler: Total delay: 0.023 s for time 1412767004000 ms (execution: 0.019 s)
14/10/08 13:16:44 INFO ShuffledRDD: Removing RDD 428 from persistence list
14/10/08 13:16:44 INFO BlockManager: Removing RDD 428
14/10/08 13:16:44 INFO MappedRDD: Removing RDD 427 from persistence list
14/10/08 13:16:44 INFO BlockManager: Removing RDD 427
14/10/08 13:16:44 INFO FlatMappedRDD: Removing RDD 426 from persistence list
14/10/08 13:16:44 INFO BlockManager: Removing RDD 426
14/10/08 13:16:44 INFO BlockRDD: Removing RDD 425 from persistence list
14/10/08 13:16:44 INFO SocketInputDStream: Removing blocks of RDD BlockRDD[425] at BlockRDD at ReceiverInputDStream.scala:69 of time 1412767004000 ms
14/10/08 13:16:44 INFO BlockManager: Removing RDD 425
14/10/08 13:16:44 INFO SocketReceiver: Stopped receiving
14/10/08 13:16:44 INFO SocketReceiver: Closed socket to localhost:9999
14/10/08 13:16:44 WARN ReceiverSupervisorImpl: Restarting receiver with delay 2000 ms: Retrying connecting to localhost:9999
14/10/08 13:16:44 INFO ReceiverSupervisorImpl: Stopping receiver with message: Restarting receiver with delay 2000ms: Retrying connecting to localhost:9999:
14/10/08 13:16:44 INFO ReceiverSupervisorImpl: Called receiver onStop
14/10/08 13:16:44 INFO ReceiverSupervisorImpl: Deregistering receiver 0
14/10/08 13:16:44 ERROR ReceiverTracker: Deregistered receiver for stream 0: Restarting receiver with delay 2000ms: Retrying connecting to localhost:9999
14/10/08 13:16:44 INFO ReceiverSupervisorImpl: Stopped receiver 0
14/10/08 13:16:45 INFO ReceiverTracker: Stream 0 received 0 blocks
14/10/08 13:16:45 INFO JobScheduler: Added jobs for time 1412767005000 ms
14/10/08 13:16:45 INFO JobScheduler: Starting job streaming job 1412767005000 ms.0 from job set of time 1412767005000 ms
14/10/08 13:16:45 INFO SparkContext: Starting job: take at DStream.scala:608
14/10/08 13:16:45 INFO DAGScheduler: Registering RDD 435 (map at MappedDStream.scala:35)
14/10/08 13:16:45 INFO DAGScheduler: Got job 217 (take at DStream.scala:608) with 1 output partitions (allowLocal=true)
14/10/08 13:16:45 INFO DAGScheduler: Final stage: Stage 433(take at DStream.scala:608)
14/10/08 13:16:45 INFO DAGScheduler: Parents of final stage: List(Stage 434)
14/10/08 13:16:45 INFO DAGScheduler: Missing parents: List()
14/10/08 13:16:45 INFO DAGScheduler: Submitting Stage 433 (ShuffledRDD[436] at combineByKey at ShuffledDStream.scala:42), which has no missing parents
14/10/08 13:16:45 INFO MemoryStore: ensureFreeSpace(2256) called with curMem=23776, maxMem=277842493
14/10/08 13:16:45 INFO MemoryStore: Block broadcast_217 stored as values in memory (estimated size 2.2 KB, free 264.9 MB)
14/10/08 13:16:45 INFO DAGScheduler: Submitting 1 missing tasks from Stage 433 (ShuffledRDD[436] at combineByKey at ShuffledDStream.scala:42)
14/10/08 13:16:45 INFO TaskSchedulerImpl: Adding task set 433.0 with 1 tasks
14/10/08 13:16:45 INFO TaskSetManager: Starting task 0.0 in stage 433.0 (TID 217, localhost, PROCESS_LOCAL, 1008 bytes)
14/10/08 13:16:45 INFO Executor: Running task 0.0 in stage 433.0 (TID 217)
14/10/08 13:16:45 INFO BlockFetcherIterator$BasicBlockFetcherIterator: maxBytesInFlight: 50331648, targetRequestSize: 10066329
14/10/08 13:16:45 INFO BlockFetcherIterator$BasicBlockFetcherIterator: Getting 0 non-empty blocks out of 0 blocks
14/10/08 13:16:45 INFO BlockFetcherIterator$BasicBlockFetcherIterator: Started 0 remote fetches in 0 ms
14/10/08 13:16:45 INFO Executor: Finished task 0.0 in stage 433.0 (TID 217). 822 bytes result sent to driver
14/10/08 13:16:45 INFO TaskSetManager: Finished task 0.0 in stage 433.0 (TID 217) in 4 ms on localhost (1/1)
14/10/08 13:16:45 INFO DAGScheduler: Stage 433 (take at DStream.scala:608) finished in 0.006 s
14/10/08 13:16:45 INFO TaskSchedulerImpl: Removed TaskSet 433.0, whose tasks have all completed, from pool
14/10/08 13:16:45 INFO SparkContext: Job finished: take at DStream.scala:608, took 0.009386933 s
14/10/08 13:16:45 INFO SparkContext: Starting job: take at DStream.scala:608
14/10/08 13:16:45 INFO MapOutputTrackerMaster: Size of output statuses for shuffle 108 is 82 bytes
14/10/08 13:16:45 INFO DAGScheduler: Got job 218 (take at DStream.scala:608) with 1 output partitions (allowLocal=true)
14/10/08 13:16:45 INFO DAGScheduler: Final stage: Stage 435(take at DStream.scala:608)
14/10/08 13:16:45 INFO DAGScheduler: Parents of final stage: List(Stage 436)
14/10/08 13:16:45 INFO DAGScheduler: Missing parents: List()
14/10/08 13:16:45 INFO DAGScheduler: Submitting Stage 435 (ShuffledRDD[436] at combineByKey at ShuffledDStream.scala:42), which has no missing parents
14/10/08 13:16:45 INFO MemoryStore: ensureFreeSpace(2256) called with curMem=26032, maxMem=277842493
14/10/08 13:16:45 INFO MemoryStore: Block broadcast_218 stored as values in memory (estimated size 2.2 KB, free 264.9 MB)
14/10/08 13:16:45 INFO DAGScheduler: Submitting 1 missing tasks from Stage 435 (ShuffledRDD[436] at combineByKey at ShuffledDStream.scala:42)
14/10/08 13:16:45 INFO TaskSchedulerImpl: Adding task set 435.0 with 1 tasks
14/10/08 13:16:45 INFO TaskSetManager: Starting task 0.0 in stage 435.0 (TID 218, localhost, PROCESS_LOCAL, 1008 bytes)
14/10/08 13:16:45 INFO Executor: Running task 0.0 in stage 435.0 (TID 218)
14/10/08 13:16:45 INFO BlockFetcherIterator$BasicBlockFetcherIterator: maxBytesInFlight: 50331648, targetRequestSize: 10066329
14/10/08 13:16:45 INFO BlockFetcherIterator$BasicBlockFetcherIterator: Getting 0 non-empty blocks out of 0 blocks
14/10/08 13:16:45 INFO BlockFetcherIterator$BasicBlockFetcherIterator: Started 0 remote fetches in 1 ms
14/10/08 13:16:45 INFO Executor: Finished task 0.0 in stage 435.0 (TID 218). 822 bytes result sent to driver
14/10/08 13:16:45 INFO TaskSetManager: Finished task 0.0 in stage 435.0 (TID 218) in 3 ms on localhost (1/1)
14/10/08 13:16:45 INFO TaskSchedulerImpl: Removed TaskSet 435.0, whose tasks have all completed, from pool
14/10/08 13:16:45 INFO DAGScheduler: Stage 435 (take at DStream.scala:608) finished in 0.003 s
14/10/08 13:16:45 INFO SparkContext: Job finished: take at DStream.scala:608, took 0.008348754 s
-------------------------------------------
Time: 1412767005000 ms
-------------------------------------------

在 Web UI 上,我可以看到以下屏幕截图:

enter image description here

显然,当我写一些示例词时,netcat -lk 9999 没有任何作用。

有人能帮我弄清楚这个例子是如何工作的吗?

谢谢

最佳答案

如评论中所述


nc -lk 9999 in console 

然后在 spark 文件夹中运行以下命令
bin/run-example org.apache.spark.examples.streaming.JavaNetworkWordCount localhost 9999

现在在运行 nc 的控制台选项卡中添加单词
It is working! Life is beautiful!

并检查输出,在 spark 文件夹中
(beautiful!,1)
(working!,1)
(is,2)
(It,1)
(Life,1)

如果你继续添加,它程序将继续巩固..
希望这可以帮助

关于apache-spark - Apache Spark 流简单应用程序不起作用,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/26255738/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com