gpt4 book ai didi

apache-spark - 在 Spark Streaming (Spark 2.0) 中使用 Kafka

转载 作者:行者123 更新时间:2023-12-04 04:45:29 25 4
gpt4 key购买 nike

我发现有两种方法可以在 Spark Streaming (Spark 2.0) 中使用 Kafka 主题:

1) 使用KafkaUtils.createDirectStream每k秒获取一次DStream,请引用this document

2) 使用 kafka: sqlContext.read.format(“json”).stream(“kafka://KAFKA_HOST”) 为 Spark 2.0 的新特性:Structured 创建一个无限的 DataFrame流媒体,相关文档 is here

方法 1) 有效,但 2) 无效,我得到以下错误

Exception in thread "main" java.lang.NoSuchMethodError: org.apache.spark.sql.DataFrameReader.stream(Ljava/lang/String;)Lorg/apache/spark/sql/Dataset;
...
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:729)
at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:185)
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:210)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:124)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

我的问题是:
“kafka://KAFKA_HOST” 指的是什么?
我应该如何解决这个问题?

提前致谢!

最佳答案

Spark 2.0 尚不支持将 Kafka 作为无限数据帧/集的来源。计划在2.1加入支持

编辑:(6.12.2016)

Kafka 0.10 现在是 expiramentaly supported in Spark 2.0.2 :

val ds1 = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribe", "topic1")
.load()

ds1
.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.as[(String, String)]

关于apache-spark - 在 Spark Streaming (Spark 2.0) 中使用 Kafka,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/38578909/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com