gpt4 book ai didi

apache-spark - 如何将 Spark 消耗的最新偏移量保存到 ZK 或 Kafka 并在重启后可以回读

转载 作者:行者123 更新时间:2023-12-03 22:16:33 25 4
gpt4 key购买 nike

我正在使用 Kafka 0.8.2从 AdExchange 接收数据然后我使用 Spark Streaming 1.4.1将数据存储到 MongoDB .

我的问题是当我重新启动 Spark Streaming 时工作例如更新新版本,修复错误,添加新功能。会继续阅读最新的offsetkafka当时我将在重新启 Action 业期间丢失数据 AdX 推送到 kafka。

我尝试类似 auto.offset.reset -> smallest但它会从 0 -> 最后接收数据,然后数据很大并且在 db 中重复。

我也尝试设置特定的 group.idconsumer.idSpark但它是一样的。

如何保存最新的offset Spark 消耗到 zookeeperkafka然后可以从那个读回最新的 offset ?

最佳答案

createDirectStream 函数的构造函数之一可以获得一个映射,该映射将保存分区 id 作为键和您开始使用的偏移量作为值。

看看这里的api:http://spark.apache.org/docs/2.2.0/api/java/org/apache/spark/streaming/kafka/KafkaUtils.html
我所说的 map 通常称为:fromOffsets

您可以向 map 插入数据:

startOffsetsMap.put(TopicAndPartition(topicName,partitionId), startOffset)

并在创建直接流时使用它:
KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder, (String, String)](
streamingContext, kafkaParams, startOffsetsMap, messageHandler(_))

每次迭代后,您可以使用以下方法获取处理后的偏移量:
rdd.asInstanceOf[HasOffsetRanges].offsetRanges

您将能够使用此数据在下一次迭代中构建 fromOffsets 映射。

你可以在这里看到完整的代码和用法: https://spark.apache.org/docs/latest/streaming-kafka-integration.html在页面的末尾

关于apache-spark - 如何将 Spark 消耗的最新偏移量保存到 ZK 或 Kafka 并在重启后可以回读,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/31846654/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com