gpt4 book ai didi

python - Azure Eventhub (Python) : checkpointing with blob storage - keyerror issue in EventProcessor when checkpointing is enabled

转载 作者:行者123 更新时间:2023-12-04 12:58:11 34 4
gpt4 key购买 nike

我在 eventhub 中遇到了 Blob 存储检查点问题。如果我在获取消费者客户端时没有设置 checkpoint_store,我的应用程序运行正常。每当我尝试设置 checkpoint_store 变量并运行我的代码时,它都会引发以下异常:

EventProcessor instance 'xxxxxxxxxxx' of eventhub <name of my eventhub> consumer group <name of my consumer group>. An error occurred while load-balancing and claiming ownership. The exception is KeyError('ownerid'). Retrying after xxxx seconds

我能找到的唯一提到这种错误的 github 条目是 this one ,但是问题本身从未得到解决,有问题的人最终使用了不同的库。

我正在使用的相关库是 azure-eventhub 和 azure-eventhub-checkpointstoreblob-aio

以下是我正在使用的代码的相关片段 ( I used this tutorial as a guide ):

import asyncio
from azure.eventhub.aio import EventHubConsumerClient, EventHubProducerClient
from azure.eventhub import EventData
from azure.eventhub.extensions.checkpointstoreblobaio import BlobCheckpointStore
async def on_event(partition_context, event):
await partition_context.update_checkpoint(event)
#<do stuff with event data>
checkpoint_store = BlobCheckpointStore.from_connection_string(blob_connection_string, container_name)
client = EventHubConsumerClient.from_connection_string(connection_str, consumer_group, eventhub_name=input_eventhub_name, checkpoint_store=checkpoint_store)

async def main():
async with client:
await client.receive(
on_event=on_event,
)
print("Terminated.")

if __name__ == '__main__':
loop = asyncio.get_event_loop()
loop.run_until_complete(main())

问题似乎仅与 Blob 存储检查点有关;如果我在创建消费者客户端时注释掉“checkpoint_store=checkpoint_store”,则一切运行都不会出现问题。

与 Blob 存储的连接看起来很好,因为我进行了一些挖掘,发现在 Blob 存储中创建了一些文件夹“检查点”和“所有权”: blob storage snapshot后者包含一些元数据中带有“ownerid”的文件: owner files metadata

即 key 肯定存在。我认为正在发生的情况是,EventProcessor 正在尝试获取这些 blob 的所有权元数据,但不知何故未能成功。如果有人知道如何解决这个问题,我将非常感激!

最佳答案

这看起来像是从 Blob 之一检索“ownerid”时出现的问题。您能帮我测试一下这些场景吗?

  1. 从 Blob 容器中删除所有内容并重试。
  2. 如果问题仍然存在,您能否检查每个 blob 是否都具有元数据“ownerid”?
  3. 如果问题仍然存在,您能否将库 azure-eventhub-checkpointstoreblob-aio 版本 1.1.0 中文件 azure.eventhub.extensions.checkpointstoreblobaio._blobstoragecsaio.py 的第 144 行替换为以下内容,然后重试?
"owner_id": blob.metadata.get("ownerid"),

关于python - Azure Eventhub (Python) : checkpointing with blob storage - keyerror issue in EventProcessor when checkpointing is enabled,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/63354884/

34 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com