如何将 azure.eventhub.common.Offset 存储在 Python 中?
How can I store azure.eventhub.common.Offset in Python?
根据 Azure 事件中心的 official documentation,消费者有责任管理偏移量。引用:
Consumers are responsible for storing their own offset values outside
of the Event Hubs service.
但是查看 API doc for event hub Offset class,很明显它没有提供序列化或其他存储方式。
所以我的问题是:我将如何存储事件中心偏移量?
请仔细参考common.py
of GitHub repo Azure/azure-event-hubs-python
and the Offset
的源码class在第253行定义如下
class Offset(object):
"""
The offset (position or timestamp) where a receiver starts. Examples:
Beginning of the event stream:
>>> offset = Offset("-1")
End of the event stream:
>>> offset = Offset("@latest")
Events after the specified offset:
>>> offset = Offset("12345")
Events from the specified offset:
>>> offset = Offset("12345", True)
Events after a datetime:
>>> offset = Offset(datetime.datetime.utcnow())
Events after a specific sequence number:
>>> offset = Offset(1506968696002)
"""
def __init__(self, value, inclusive=False):
"""
Initialize Offset.
:param value: The offset value.
:type value: ~datetime.datetime or int or str
:param inclusive: Whether to include the supplied value as the start point.
:type inclusive: bool
"""
self.value = value
self.inclusive = inclusive
def selector(self):
"""
Creates a selector expression of the offset.
:rtype: bytes
"""
operator = ">=" if self.inclusive else ">"
if isinstance(self.value, datetime.datetime):
timestamp = (calendar.timegm(self.value.utctimetuple()) * 1000) + (self.value.microsecond/1000)
return ("amqp.annotation.x-opt-enqueued-time {} '{}'".format(operator, int(timestamp))).encode('utf-8')
if isinstance(self.value, six.integer_types):
return ("amqp.annotation.x-opt-sequence-number {} '{}'".format(operator, self.value)).encode('utf-8')
return ("amqp.annotation.x-opt-offset {} '{}'".format(operator, self.value)).encode('utf-8')
根据Offset
class的源代码,这只是一个普通的Pythonclass,有两个属性value
和inclusive
.您可以简单地将其属性的值存储为 json 字符串或其他,或者只是提取这些值,如下面的示例代码。
from azure.eventhub.common import Offset
offset = Offset("-1")
print(offset.value, offset.inclusive)
# -1 False
print(offset.__dict__)
# {'value': '-1', 'inclusive': False}
import json
offset_json = json.dumps(offset.__dict__)
# '{"value": "-1", "inclusive": false}'
注意:将来,GitHub 回购 Azure/azure-event-hubs-python
will completed move to GitHub repo Azure/azure-sdk-for-python
, the changes for Offset
class is renamed as EventPosition
class 具有相同的属性 value
和 inclusive
。
根据 Azure 事件中心的 official documentation,消费者有责任管理偏移量。引用:
Consumers are responsible for storing their own offset values outside of the Event Hubs service.
但是查看 API doc for event hub Offset class,很明显它没有提供序列化或其他存储方式。
所以我的问题是:我将如何存储事件中心偏移量?
请仔细参考common.py
of GitHub repo Azure/azure-event-hubs-python
and the Offset
的源码class在第253行定义如下
class Offset(object):
"""
The offset (position or timestamp) where a receiver starts. Examples:
Beginning of the event stream:
>>> offset = Offset("-1")
End of the event stream:
>>> offset = Offset("@latest")
Events after the specified offset:
>>> offset = Offset("12345")
Events from the specified offset:
>>> offset = Offset("12345", True)
Events after a datetime:
>>> offset = Offset(datetime.datetime.utcnow())
Events after a specific sequence number:
>>> offset = Offset(1506968696002)
"""
def __init__(self, value, inclusive=False):
"""
Initialize Offset.
:param value: The offset value.
:type value: ~datetime.datetime or int or str
:param inclusive: Whether to include the supplied value as the start point.
:type inclusive: bool
"""
self.value = value
self.inclusive = inclusive
def selector(self):
"""
Creates a selector expression of the offset.
:rtype: bytes
"""
operator = ">=" if self.inclusive else ">"
if isinstance(self.value, datetime.datetime):
timestamp = (calendar.timegm(self.value.utctimetuple()) * 1000) + (self.value.microsecond/1000)
return ("amqp.annotation.x-opt-enqueued-time {} '{}'".format(operator, int(timestamp))).encode('utf-8')
if isinstance(self.value, six.integer_types):
return ("amqp.annotation.x-opt-sequence-number {} '{}'".format(operator, self.value)).encode('utf-8')
return ("amqp.annotation.x-opt-offset {} '{}'".format(operator, self.value)).encode('utf-8')
根据Offset
class的源代码,这只是一个普通的Pythonclass,有两个属性value
和inclusive
.您可以简单地将其属性的值存储为 json 字符串或其他,或者只是提取这些值,如下面的示例代码。
from azure.eventhub.common import Offset
offset = Offset("-1")
print(offset.value, offset.inclusive)
# -1 False
print(offset.__dict__)
# {'value': '-1', 'inclusive': False}
import json
offset_json = json.dumps(offset.__dict__)
# '{"value": "-1", "inclusive": false}'
注意:将来,GitHub 回购 Azure/azure-event-hubs-python
will completed move to GitHub repo Azure/azure-sdk-for-python
, the changes for Offset
class is renamed as EventPosition
class 具有相同的属性 value
和 inclusive
。