gpt4 book ai didi

python - 为什么 Azure Eventhub python 库抛出 KeyError : 'all-partitions' when it reaches the maximum size?

转载 作者:行者123 更新时间:2023-12-03 05:38:28 26 4
gpt4 key购买 nike

我们正在升级一些使用 python libraries for Azure Event hub 的脚本到最新版本(5.0)。我主要遵循标题为将事件发布到事件中心的文档中的示例。当我第一次阅读代码时,我认为这很有趣,因为它依赖于遇到 ValueError 异常。看起来不是最好的设计。但无论如何,我还是同意了。我将把示例代码放在这里来限制读者的选项卡切换:

# THIS IS THE EXAMPLE CODE FROM MICROSOFT
event_data_batch = client.create_batch()
can_add = True
while can_add:
try:
event_data_batch.add(EventData('Message inside EventBatchData'))
except ValueError:
can_add = False # EventDataBatch object reaches max_size.

with client:
client.send_batch(event_data_batch)

因此,我们查询不同的 api,然后将数据发送到 Eventhub,因此我已经有了一个 For 循环,循环遍历事件并一次发送 1 个事件。我们希望批处理能够使其变得更快、更高效。以下是我如何将示例集成到 for 循环中:

# THIS IS OUR CUSTOM SCRIPT
self.output_client = EventHubProducerClient.from_connection_string(conn_str, eventhub_name=eventhub_name)


if "eventhub" in self.output_config.keys():
if self.output_config['eventhub'] is True:
if events:
i = 0
event_data_batch = self.output_client.create_batch()
for event in events:
try:
event_data_batch.add(EventData(json.dumps(event)))
except ValueError: # EventDataBatch object reaches max_size.
# Ship events
with self.output_client:
self.output_client.send_batch(event_data_batch)
# Set up the next batch
event_data_batch = self.output_client.create_batch()
except Exception as e:
self.output_error = True
self.logger.error("Error shipping event to EventHub: {}".format(e))
i += 1

if not self.output_error:
if events:
with self.output_client:
self.output_client.send_batch(event_data_batch)
self.logger.info("Sent %d events" % (len(events)))
else:
self.logger.error("Error(s) sending %d / %d events" % (i, len(events)))

请注意我们如何在 if not self.output_error block 中传送事件,因为有时我们可能不会达到示例中 ValueError 的最大大小。无论如何,在测试时,如果我们没有达到限制,一切正常,但如果我们达到最大大小,我们会得到这个错误(我们还无法解决):

2020-03-02 12:59:43,697 - DEBUG - o365-dev - Period is 30
2020-03-02 12:59:43,699 - DEBUG - o365-dev - Output handling 1952 events.

Traceback (most recent call last):
File "C:\Code\github\cc_eh_batching\modules\base\__init__.py", line 230, in output
event_data_batch.add(EventData(json.dumps(event)))
File "C:\Code\github\cc_eh_batching\venv\lib\site-packages\azure\eventhub\_common.py", line 364, in add
self.max_size_in_bytes
ValueError: EventDataBatch has reached its size limit: 1046528

During handling of the above exception, another exception occurred:

Traceback (most recent call last):

File "C:\Code\github\cc_eh_batching\venv\lib\site-packages\azure\eventhub\_producer_client.py", line 216, in send_batch
cast(EventHubProducer, self._producers[partition_id]).send(
KeyError: 'all-partitions'

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
File "cloud-connector.py", line 175, in <module>
main()
File "cloud-connector.py", line 171, in main
cloud.setup_connections()
File "cloud-connector.py", line 135, in setup_connections
self.connections[conn['name']] = self.modules[conn['module']].Module(conn['name'], self.config['output'], loglevel=self.logger.getEffectiveLevel())
File "C:\Code\github\cc_eh_batching\modules\base\__init__.py", line 89, in __init__
self.run()
File "C:\Code\github\cc_eh_batching\modules\base\__init__.py", line 173, in run
self.output(events)
File "C:\Code\github\cc_eh_batching\modules\base\__init__.py", line 234, in output
self.output_client.send_batch(event_data_batch)
File "C:\Code\github\cc_eh_batching\venv\lib\site-packages\azure\eventhub\_producer_client.py", line 220, in send_batch
self._start_producer(partition_id, send_timeout)
File "C:\Code\github\cc_eh_batching\venv\lib\site-packages\azure\eventhub\_producer_client.py", line 126, in _start_producer
not self._producers[partition_id]
KeyError: 'all-partitions'

最佳答案

@jthack, "with self.output_client:"在代码块完成后关闭output_client。您使用了它两次,因此当您第二次尝试使用关闭的客户端时,客户端处于错误状态。我建议您将代码放入一个 with 语句中。

# THIS IS OUR CUSTOM SCRIPT
self.output_client = EventHubProducerClient.from_connection_string(conn_str, eventhub_name=eventhub_name)

with self.output_client:
if "eventhub" in self.output_config.keys():
if self.output_config['eventhub'] is True:
if events:
i = 0
event_data_batch = self.output_client.create_batch()
for event in events:
try:
event_data_batch.add(EventData(json.dumps(event)))
except ValueError: # EventDataBatch object reaches max_size.
# Ship events
self.output_client.send_batch(event_data_batch)
# Set up the next batch
event_data_batch = self.output_client.create_batch()
except Exception as e:
self.output_error = True
self.logger.error("Error shipping event to EventHub: {}".format(e))
i += 1

if not self.output_error:
if events:
self.output_client.send_batch(event_data_batch)
self.logger.info("Sent %d events" % (len(events)))
else:
self.logger.error("Error(s) sending %d / %d events" % (i, len(events)))

关于python - 为什么 Azure Eventhub python 库抛出 KeyError : 'all-partitions' when it reaches the maximum size?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/60513714/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com