gpt4 book ai didi

java - 清除检查点位置后 Spark 结构化流消耗的旧 Kafka 偏移量

转载 作者:行者123 更新时间:2023-11-30 05:49:18 24 4
gpt4 key购买 nike

我使用 Apache Kafka 和 Apache Spark 结构化流构建了一个应用程序。我面临以下问题。

场景:

  • 我设置了一个带有 Kafka 主题源的 Spark 结构化流,并且沉入 Kafka 主题。
  • 我们运行流并在 Kafka 上生成大量消息主题。
  • 我们通过清除检查点停止了流并重新启动流流的位置。运行 5 至 6 小时后,流是随机消费旧的 Kafka 消息。

清除检查点位置后,我只期待流中只有新消息。
Spark 版本:2.4.0,卡夫卡客户端版本:2.0.0,卡夫卡版本:2.0.0,集群管理器:Kubernetes。

我已经通过更改检查点位置尝试了这种情况,但问题仍然存在。

{
SparkConf sparkConf = new SparkConf().setAppName("SparkKafkaConsumer");
SparkSession spark = SparkSession.builder().config(sparkConf).getOrCreate();
Dataset<Row> stream = spark
.readStream()
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option(subscribeType, "REQUEST_TOPIC")
.option("failOnDataLoss",false)
.option("maxOffsetsPerTrigger","50")
.option("startingOffsets","latest")
.load()
.selectExpr(
"CAST(value AS STRING) as payload",
"CAST(key AS STRING)",
"CAST(topic AS STRING)",
"CAST(partition AS STRING)",
"CAST(offset AS STRING)",
"CAST(timestamp AS STRING)",
"CAST(timestampType AS STRING)");

DataStreamWriter<String> dataWriterStream = stream
.writeStream()
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("kafka.max.request.size", "35000000")
.option("kafka.retries", "5")
.option("kafka.batch.size", "35000000")
.option("kafka.receive.buffer.bytes", "200000000")
.option("kafka.acks","0")
.option("kafka.compression.type", "snappy")
.option("kafka.linger.ms", "0")
.option("kafka.buffer.memory", "50000000")
.option("topic", "RESPONSE_TOPIC")
.outputMode("append")
.option("checkpointLocation", checkPointDirectory);
spark.streams().awaitAnyTermination();

}

最佳答案

检查下面的链接,

https://jaceklaskowski.gitbooks.io/mastering-apache-spark/spark-rdd-checkpointing.html

您可以调用 SparkContext.setCheckpointDir(directory: String) 来设置检查点目录 - RDD 被检查点的目录。如果在集群上运行,该目录必须是 HDFS 路径。原因是驱动程序可能会尝试从其自己的本地文件系统重建带有检查点的 RDD,这是不正确的,因为检查点文件实际上位于执行器计算机上

关于java - 清除检查点位置后 Spark 结构化流消耗的旧 Kafka 偏移量,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/54233328/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com