gpt4 book ai didi

apache-spark - Spark-Streaming 最早在 kafka 开始偏移时挂起(Kafka 2,spark 2.4.3)

转载 作者:行者123 更新时间:2023-12-04 12:06:05 35 4
gpt4 key购买 nike

我遇到了 Spark-Streaming 和 Kafka 的问题。在运行示例程序以从 Kafka 主题中消费并将微批处理结果输出到终端时,当我设置选项时,我的工作似乎挂起:
df.option("startingOffsets", "earliest")
从最新的偏移量开始作业工作正常,当每个微批次流过时,结果会打印到终端。

我在想这可能是资源问题——我正在尝试从一个包含大量数据的主题中读取数据。但是我似乎没有内存/cpu 问题(使用本地 [*] 集群运行此作业)。这项工作似乎从未真正开始,但只是悬而未决:
19/09/17 15:21:37 INFO Metadata: Cluster ID: JFXVL24JQ3K4CEbE-VA58A

  val sc = new SparkConf().setMaster("local[*]").setAppName("spark-test")
val streamContext = new StreamingContext(sc, Seconds(1))
val spark = SparkSession.builder().appName("spark-test")
.getOrCreate()

val topic = "topic.with.alotta.data"

//subscribe tokafka
val df = spark.readStream.format("kafka")
.option("kafka.bootstrap.servers", "127.0.0.1:9092")
.option("subscribe", topic)
.option("startingOffsets", "earliest")
.load()

//write
df.writeStream
.outputMode("append")
.format("console")
.option("truncate", "false")
.start()
.awaitTermination()


我希望看到打印到控制台的结果......但是,应用程序似乎就像我提到的那样挂起。有什么想法吗?感觉像是一个 Spark 资源问题(因为我正在针对一个有大量数据的主题运行一个本地“集群”。是否有关于我缺少的流数据帧的性质?

最佳答案

写入控制台会导致每次触发时在驱动程序的内存中收集所有数据。由于您目前没有限制批次的大小,这意味着整个主题内容都在驱动程序中累积。见 https://spark.apache.org/docs/2.4.3/structured-streaming-programming-guide.html#output-sinks

设置批量大小的限制应该可以解决您的问题。
尝试添加 maxOffsetsPerTrigger从 Kafka 读取时的设置...

  val df = spark.readStream.format("kafka")
.option("kafka.bootstrap.servers", "127.0.0.1:9092")
.option("subscribe", topic)
.option("startingOffsets", "earliest")
.option("maxOffsetsPerTrigger", 1000)
.load()

https://spark.apache.org/docs/2.4.3/structured-streaming-kafka-integration.html详情。

关于apache-spark - Spark-Streaming 最早在 kafka 开始偏移时挂起(Kafka 2,spark 2.4.3),我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/57982370/

35 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com