gpt4 book ai didi

python - 如何在 Python 中存储 azure.eventhub.common.Offset?

转载 作者:行者123 更新时间:2023-12-03 05:41:37 30 4
gpt4 key购买 nike

根据official documentation对于 Azure 事件中心,消费者有责任管理偏移量。引用:

Consumers are responsible for storing their own offset values outside of the Event Hubs service.

但是看看 API doc for event hub Offset class ,很明显它无法提供序列化或其他存储方式。

所以我的问题是:我将如何存储事件中心偏移量?

最佳答案

请仔细引用common.py的源码GitHub 存储库 Azure/azure-event-hubs-pythonOffset class 在第 253 行定义如下。

class Offset(object):
"""
The offset (position or timestamp) where a receiver starts. Examples:
Beginning of the event stream:
>>> offset = Offset("-1")
End of the event stream:
>>> offset = Offset("@latest")
Events after the specified offset:
>>> offset = Offset("12345")
Events from the specified offset:
>>> offset = Offset("12345", True)
Events after a datetime:
>>> offset = Offset(datetime.datetime.utcnow())
Events after a specific sequence number:
>>> offset = Offset(1506968696002)
"""

def __init__(self, value, inclusive=False):
"""
Initialize Offset.
:param value: The offset value.
:type value: ~datetime.datetime or int or str
:param inclusive: Whether to include the supplied value as the start point.
:type inclusive: bool
"""
self.value = value
self.inclusive = inclusive

def selector(self):
"""
Creates a selector expression of the offset.
:rtype: bytes
"""
operator = ">=" if self.inclusive else ">"
if isinstance(self.value, datetime.datetime):
timestamp = (calendar.timegm(self.value.utctimetuple()) * 1000) + (self.value.microsecond/1000)
return ("amqp.annotation.x-opt-enqueued-time {} '{}'".format(operator, int(timestamp))).encode('utf-8')
if isinstance(self.value, six.integer_types):
return ("amqp.annotation.x-opt-sequence-number {} '{}'".format(operator, self.value)).encode('utf-8')
return ("amqp.annotation.x-opt-offset {} '{}'".format(operator, self.value)).encode('utf-8')

根据Offset类的源代码,它只是一个普通的Python类,具有两个属性valueinclusive。您可以简单地将其属性的值存储为 json 字符串或其他字符串,或者只是提取这些值,如下面的示例代码。

from azure.eventhub.common import Offset
offset = Offset("-1")
print(offset.value, offset.inclusive)
# -1 False
print(offset.__dict__)
# {'value': '-1', 'inclusive': False}
import json
offset_json = json.dumps(offset.__dict__)
# '{"value": "-1", "inclusive": false}'

注意:将来,GitHub 存储库 Azure/azure-event-hubs-python将完成向 GitHub 存储库的迁移 Azure/azure-sdk-for-pythonOffset 类的更改被重命名为 EventPosition具有相同属性 valueinclusive 的类。

关于python - 如何在 Python 中存储 azure.eventhub.common.Offset?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/58267623/

30 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com