gpt4 book ai didi

apache-spark - 如果在提供给 kafka 的数据中遇到意外格式,当您重新启动 spark 作业时会发生什么

转载 作者:行者123 更新时间:2023-12-04 04:10:51 26 4
gpt4 key购买 nike

我有一个关于 Spark Structured Streaming with Kafka 的问题。假设我正在运行一个 spark 作业,并且一切都运行良好。有一天,我的 spark 作业失败了,因为提供给 kafka 的数据不一致。不一致可能是数据格式问题或 spark 无法处理的垃圾字符。在这种情况下,我们如何解决问题?有没有一种方法可以让我们进入 kafka 主题并手动更改数据?

如果我们不修复数据问题并重新启动 spark 作业,它将读取导致失败的相同旧行,因为我们尚未提交检查点。那么我们如何摆脱这个循环。如何解决 Kafka 主题中的数据问题以恢复中止的 Spark 作业?

最佳答案

除非您真的知道自己在做什么,否则我会避免尝试手动更改 Kafka 主题中的单个消息。

为防止将来发生这种情况,您可能需要考虑为您的数据使用模式(结合模式注册表)。

为了缓解您描述的问题,我看到了以下选项:

  • 手动更改结构化流应用程序的 Consumer Group 的偏移量
  • 创建一个从特定偏移量开始读取的"new"流作业

手动更改偏移量

当使用 Sparks 结构化流时,消费者组由 Spark 自动设置。根据code消费者组将被定义为:

val uniqueGroupId = s"spark-kafka-source-${UUID.randomUUID}-${metadataPath.hashCode}"

您可以使用 kafka-consumer-groups 工具更改偏移量。首先通过

识别消费组的实际名称
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list

然后为特定主题的消费者组设置偏移量(例如偏移量 100)

bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --execute --reset-offsets --group spark-kafka-source-1337 --topic topic1 --to-offset 100

如果您只需要更改特定分区的偏移量,您可以查看该工具的帮助功能以了解如何执行此操作。

创建新的流媒体作业

您可以使用 Spark 选项 startingOffsets,如 Spark + Kafka integration guide 中所述:

Option: startingOffsets

value: "earliest", "latest" (streaming only), or json string """ {"topicA":{"0":23,"1":-1},"topicB":{"0":-2}} """

default: "latest" for streaming, "earliest" for batch

meaning: The start point when a query is started, either "earliest" which is from the earliest offsets, "latest" which is just from the latest offsets, or a json string specifying a starting offset for each TopicPartition. In the json, -2 as an offset can be used to refer to earliest, -1 to latest. Note: For batch queries, latest (either implicitly or by using -1 in json) is not allowed. For streaming queries, this only applies when a new query is started, and that resuming will always pick up from where the query left off. Newly discovered partitions during a query will start at earliest.

要实现这一点,重要的是要有一个"new"查询。这意味着您需要删除现有作业的检查点文件或创建完整的新应用程序。

关于apache-spark - 如果在提供给 kafka 的数据中遇到意外格式,当您重新启动 spark 作业时会发生什么,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/61757770/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com