gpt4 book ai didi

java - 版本冲突,当前版本 [2] 与提供的版本 [1] 不同

转载 作者:行者123 更新时间:2023-12-01 19:43:30 27 4
gpt4 key购买 nike

我有一个 Kafka 主题和一个 Spark 应用程序。 Spark 应用程序从 Kafka 主题获取数据,对其进行预聚合并将其存储在 Elastic Search 中。听起来很简单,对吧?

一切都按预期工作正常,但是当我将“spark.cores”属性设置为 1 以外的值时,我开始得到

version conflict, current version [2] is different than the one provided [1]

经过一番研究,我认为错误是因为多个核心可以同时拥有相同的文档,因此,当一个核心完成聚合并尝试写回文档时,它会收到此错误

TBH,我对这种行为感到有点惊讶,因为我认为 Spark 和 ES 会自己处理这个问题。这让我相信,也许我的方法有问题。

我该如何解决这个问题?我需要遵循某种“同步”或“锁定”概念吗?

干杯!

最佳答案

听起来队列中有几条消息,它们都更新同一个 ES 文档,并且这些消息正在同时处理。有两种可能的解决方案:

首先,可以使用Kafka分区来确保所有更新同一个ES文档的消息都按顺序处理。这假设您的消息中有一些属性,Kafka 可以使用这些属性来确定消息如何映射到 ES 文档。

另一种方法是处理乐观并发冲突的标准方法:重试事务。如果你有一些来自Kafka消息的数据需要添加到ES文档中,并且ES中当前的文档是版本1,那么你可以尝试更新它并保存回版本2。但是如果其他人已经写了版本2 ,您可以使用版本 2 作为起点,添加新数据并保存版本 3 来重试。

如果这些方法中的任何一种破坏了您期望从 Kafka 和 Spark 获得的并发性,那么您可能需要重新考虑您的方法。您可能必须引入一个新的处理阶段,该阶段会执行一些繁重的工作,但实际上并不写入 ES,然后在单独的步骤中进行 ES 更新。

关于java - 版本冲突,当前版本 [2] 与提供的版本 [1] 不同,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/54462673/

27 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com