gpt4 book ai didi

hadoop - Kafka Spark 流式传输 : unable to read messages

转载 作者:可可西里 更新时间:2023-11-01 14:20:47 30 4
gpt4 key购买 nike

我正在使用 spark-streaming 集成 Kafka 和 Spark。我作为kafka生产者创建了一个主题:

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test 

我在 kafka 中发布消息并尝试使用 spark-streaming java 代码读取它们并将它们显示在屏幕上。
守护进程全部启动:Spark-master,worker;动物园管理员;卡夫卡。
我正在使用 KafkaUtils.createStream
编写一个 Java 代码来完成它代码如下:

public class SparkStream {
public static void main(String args[])
{
if(args.length != 3)
{
System.out.println("SparkStream <zookeeper_ip> <group_nm> <topic1,topic2,...>");
System.exit(1);
}


Map<String,Integer> topicMap = new HashMap<String,Integer>();
String[] topic = args[2].split(",");
for(String t: topic)
{
topicMap.put(t, new Integer(1));
}

JavaStreamingContext jssc = new JavaStreamingContext("spark://192.168.88.130:7077", "SparkStream", new Duration(3000));
JavaPairReceiverInputDStream<String, String> messages = KafkaUtils.createStream(jssc, args[0], args[1], topicMap );

System.out.println("Connection done++++++++++++++");
JavaDStream<String> data = messages.map(new Function<Tuple2<String, String>, String>()
{
public String call(Tuple2<String, String> message)
{
System.out.println("NewMessage: "+message._2()+"++++++++++++++++++");
return message._2();
}
}
);
data.print();

jssc.start();
jssc.awaitTermination();

}
}

我正在运行作业,在其他终端我正在运行 kafka-producer 来发布消息:

Hi kafka
second message
another message

但是 spark-streaming 控制台的输出日志不显示消息,而是显示收到的零个 block :

-------------------------------------------
Time: 1417438988000 ms
-------------------------------------------

2014-12-01 08:03:08,008 INFO [sparkDriver-akka.actor.default-dispatcher-4] scheduler.JobScheduler (Logging.scala:logInfo(59)) - Starting job streaming job 1417438988000 ms.0 from job set of time 1417438988000 ms
2014-12-01 08:03:08,008 INFO [sparkDriver-akka.actor.default-dispatcher-4] scheduler.JobScheduler (Logging.scala:logInfo(59)) - Finished job streaming job 1417438988000 ms.0 from job set of time 1417438988000 ms
2014-12-01 08:03:08,009 INFO [sparkDriver-akka.actor.default-dispatcher-4] scheduler.JobScheduler (Logging.scala:logInfo(59)) - Total delay: 0.008 s for time 1417438988000 ms (execution: 0.000 s)
2014-12-01 08:03:08,010 INFO [sparkDriver-akka.actor.default-dispatcher-15] scheduler.JobScheduler (Logging.scala:logInfo(59)) - Added jobs for time 1417438988000 ms
2014-12-01 08:03:08,015 INFO [sparkDriver-akka.actor.default-dispatcher-15] rdd.MappedRDD (Logging.scala:logInfo(59)) - Removing RDD 39 from persistence list
2014-12-01 08:03:08,024 INFO [sparkDriver-akka.actor.default-dispatcher-4] storage.BlockManager (Logging.scala:logInfo(59)) - Removing RDD 39
2014-12-01 08:03:08,027 INFO [sparkDriver-akka.actor.default-dispatcher-15] rdd.BlockRDD (Logging.scala:logInfo(59)) - Removing RDD 38 from persistence list
2014-12-01 08:03:08,031 INFO [sparkDriver-akka.actor.default-dispatcher-2] storage.BlockManager (Logging.scala:logInfo(59)) - Removing RDD 38
2014-12-01 08:03:08,033 INFO [sparkDriver-akka.actor.default-dispatcher-15] kafka.KafkaInputDStream (Logging.scala:logInfo(59)) - Removing blocks of RDD BlockRDD[38] at BlockRDD at ReceiverInputDStream.scala:69 of time 1417438988000 ms
2014-12-01 08:03:09,002 INFO [sparkDriver-akka.actor.default-dispatcher-2] scheduler.ReceiverTracker (Logging.scala:logInfo(59)) - Stream 0 received 0 blocks

为什么没有收到数据 block ?我已经尝试在控制台 bin/kafka-console-producer....bin/kafka-console-consumer... 上使用 kafka producer-consumer,它的工作完美,但为什么不是我的代码......有什么想法吗?

最佳答案

问题已解决。

上面的代码是正确的。我们将再添加两行来抑制生成的 [INFO] 和 [WARN]。所以最后的代码是:

package com.spark;

import scala.Tuple2;
import org.apache.log4j.Logger;
import org.apache.log4j.Level;
import kafka.serializer.Decoder;
import kafka.serializer.Encoder;
import org.apache.spark.streaming.Duration;
import org.apache.spark.*;
import org.apache.spark.api.java.function.*;
import org.apache.spark.api.java.*;
import org.apache.spark.streaming.kafka.KafkaUtils;
import org.apache.spark.streaming.kafka.*;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairReceiverInputDStream;
import java.util.Map;
import java.util.HashMap;

public class SparkStream {
public static void main(String args[])
{
if(args.length != 3)
{
System.out.println("SparkStream <zookeeper_ip> <group_nm> <topic1,topic2,...>");
System.exit(1);
}

Logger.getLogger("org").setLevel(Level.OFF);
Logger.getLogger("akka").setLevel(Level.OFF);
Map<String,Integer> topicMap = new HashMap<String,Integer>();
String[] topic = args[2].split(",");
for(String t: topic)
{
topicMap.put(t, new Integer(3));
}

JavaStreamingContext jssc = new JavaStreamingContext("local[4]", "SparkStream", new Duration(1000));
JavaPairReceiverInputDStream<String, String> messages = KafkaUtils.createStream(jssc, args[0], args[1], topicMap );

System.out.println("Connection done++++++++++++++");
JavaDStream<String> data = messages.map(new Function<Tuple2<String, String>, String>()
{
public String call(Tuple2<String, String> message)
{
return message._2();
}
}
);
data.print();

jssc.start();
jssc.awaitTermination();

}
}

我们还需要在 POM.xml 中添加依赖:

<dependency>
<groupId>com.msiops.footing</groupId>
<artifactId>footing-tuple</artifactId>
<version>0.2</version>
</dependency>

此依赖项用于使用 scala.Tuple2
Stream 0 received 0 block的错误是由于 spark-worker 不可用,并且 spark-worker-core 设置为 1。对于 spark-streaming,我们需要 core >=2。所以我们需要在 spark-config 文件中进行更改。请参阅安装手册。添加行 export SPARK_WORKER_CORE=5同时更改 SPARK_MASTER='hostname'SPARK_MASTER=<your local IP> .当您转到 Spark UI Web 控制台时,这个本地 ip 就是您在粗体中看到的内容...类似于:spark://192.168..:<port> .我们这里不需要端口。只需要 IP。
现在重启你的 spark-master 和 spark-worker 并开始流式传输:)

输出:

-------------------------------------------
Time: 1417443060000 ms
-------------------------------------------
message 1

-------------------------------------------
Time: 1417443061000 ms
-------------------------------------------
message 2

-------------------------------------------
Time: 1417443063000 ms
-------------------------------------------
message 3
message 4

-------------------------------------------
Time: 1417443064000 ms
-------------------------------------------
message 5
message 6
messag 7

-------------------------------------------
Time: 1417443065000 ms
-------------------------------------------
message 8

关于hadoop - Kafka Spark 流式传输 : unable to read messages,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/27182704/

30 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com