gpt4 book ai didi

java - Spark Streaming Kafka 消息未被消费

转载 作者:塔克拉玛干 更新时间:2023-11-03 03:02:23 25 4
gpt4 key购买 nike

我想使用 Spark (1.6.2) Streaming 从 Kafka(代理 v 0.10.2.1)中的主题接收消息。

我正在使用 Receiver 方法。代码如下:

public static void main(String[] args) throws Exception
{
SparkConf sparkConf = new SparkConf().setAppName("SimpleStreamingApp");
JavaStreamingContext javaStreamingContext = new JavaStreamingContext(sparkConf, new Duration(5000));
//
Map<String, Integer> topicMap = new HashMap<>();
topicMap.put("myTopic", 1);
//
String zkQuorum = "host1:port1,host2:port2,host3:port3";
//
Map<String, String> kafkaParamsMap = new HashMap<>();
kafkaParamsMap.put("bootstraps.server", zkQuorum);
kafkaParamsMap.put("metadata.broker.list", zkQuorum);
kafkaParamsMap.put("zookeeper.connect", zkQuorum);
kafkaParamsMap.put("group.id", "group_name");
kafkaParamsMap.put("security.protocol", "SASL_PLAINTEXT");
kafkaParamsMap.put("security.mechanism", "GSSAPI");
kafkaParamsMap.put("ssl.kerberos.service.name", "kafka");
kafkaParamsMap.put("key.deserializer", "kafka.serializer.StringDecoder");
kafkaParamsMap.put("value.deserializer", "kafka.serializer.DefaultDecoder");
//
JavaPairReceiverInputDStream<byte[], byte[]> stream = KafkaUtils.createStream(javaStreamingContext,
byte[].class, byte[].class,
DefaultDecoder.class, DefaultDecoder.class,
kafkaParamsMap,
topicMap,
StorageLevel.MEMORY_ONLY());

VoidFunction<JavaPairRDD<byte[], byte[]>> voidFunc = new VoidFunction<JavaPairRDD<byte[], byte[]>> ()
{
public void call(JavaPairRDD<byte[], byte[]> rdd) throws Exception
{
List<Tuple2<byte[], byte[]>> all = rdd.collect();
System.out.println("size of red: " + all.size());
}
}

stream.forEach(voidFunc);

javaStreamingContext.start();
javaStreamingContext.awaitTermination();
}

对 Kafka 的访问是kerberized。当我启动时

spark-submit --verbose --conf "spark.executor.extraJavaOptions=-Djava.security.auth.login.config=jaas.conf" --files jaas.conf,privKey.der --principal <accountName> --keytab <path to keytab file> --master yarn --jars <comma separated path to all jars> --class <fully qualified java main class> <path to jar file containing main class>

  1. VerifiableProperties来自 Kafka 的类为 kafkaParams 中包含的属性记录警告消息 HashMap :
INFO KafkaReceiver: connecting to zookeeper: <the correct zookeeper quorum provided in kafkaParams map>

VerifiableProperties: Property auto.offset.reset is overridden to largest
VerifiableProperties: Property enable.auto.commit is not valid.
VerifiableProperties: Property sasl.kerberos.service.name is not valid
VerifiableProperties: Property key.deserializer is not valid
...
VerifiableProperties: Property zookeeper.connect is overridden to ....

我认为因为这些属性不被接受,所以它可能会影响流处理。

** 当我在集群模式下启动时 --master yarn , 那么这些警告消息就不会出现**

  1. 稍后,我看到以下日志按配置每 5 秒重复一次:

    INFO BlockRDD: Removing RDD 4 from persistence list

    INFO KafkaInputDStream: Removing blocks of RDD BlockRDD[4] at createStream at ...

    INFO ReceivedBlockTracker: Deleting batches ArrayBuffer()

    INFO ... INFO BlockManager: Removing RDD 4

但是,我没有看到控制台上打印出任何实际的消息

问题:为什么我的代码没有打印任何实际消息?

我的 gradle 依赖项是:

compile group: 'org.apache.spark', name: 'spark-core_2.10', version: '1.6.2'
compile group: 'org.apache.spark', name: 'spark-streaming_2.10', version: '1.6.2'
compile group: 'org.apache.spark', name: 'spark-streaming-kafka_2.10', version: '1.6.2'

最佳答案

stream 是 JavaPairReceiverInputDStream 的一个对象。转换成Dstream,使用foreachRDD打印从Kafka消费的消息

关于java - Spark Streaming Kafka 消息未被消费,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/48286830/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com