如何将 azure.eventhub.common.Offset 存储在 Python 中?

How can I store azure.eventhub.common.Offset in Python?

根据 Azure 事件中心的 official documentation,消费者有责任管理偏移量。引用:

Consumers are responsible for storing their own offset values outside of the Event Hubs service.

但是查看 API doc for event hub Offset class,很明显它没有提供序列化或其他存储方式。

所以我的问题是:我将如何存储事件中心偏移量?

请仔细参考common.py of GitHub repo Azure/azure-event-hubs-python and the Offset的源码class在第253行定义如下

class Offset(object):
    """
    The offset (position or timestamp) where a receiver starts. Examples:
    Beginning of the event stream:
      >>> offset = Offset("-1")
    End of the event stream:
      >>> offset = Offset("@latest")
    Events after the specified offset:
      >>> offset = Offset("12345")
    Events from the specified offset:
      >>> offset = Offset("12345", True)
    Events after a datetime:
      >>> offset = Offset(datetime.datetime.utcnow())
    Events after a specific sequence number:
      >>> offset = Offset(1506968696002)
    """

    def __init__(self, value, inclusive=False):
        """
        Initialize Offset.
        :param value: The offset value.
        :type value: ~datetime.datetime or int or str
        :param inclusive: Whether to include the supplied value as the start point.
        :type inclusive: bool
        """
        self.value = value
        self.inclusive = inclusive

    def selector(self):
        """
        Creates a selector expression of the offset.
        :rtype: bytes
        """
        operator = ">=" if self.inclusive else ">"
        if isinstance(self.value, datetime.datetime):
            timestamp = (calendar.timegm(self.value.utctimetuple()) * 1000) + (self.value.microsecond/1000)
            return ("amqp.annotation.x-opt-enqueued-time {} '{}'".format(operator, int(timestamp))).encode('utf-8')
        if isinstance(self.value, six.integer_types):
            return ("amqp.annotation.x-opt-sequence-number {} '{}'".format(operator, self.value)).encode('utf-8')
        return ("amqp.annotation.x-opt-offset {} '{}'".format(operator, self.value)).encode('utf-8')

根据Offsetclass的源代码,这只是一个普通的Pythonclass,有两个属性valueinclusive .您可以简单地将其属性的值存储为 json 字符串或其他,或者只是提取这些值,如下面的示例代码。

from azure.eventhub.common import Offset
offset = Offset("-1")
print(offset.value, offset.inclusive)
# -1 False
print(offset.__dict__)
# {'value': '-1', 'inclusive': False}
import json
offset_json = json.dumps(offset.__dict__)
# '{"value": "-1", "inclusive": false}'

注意:将来,GitHub 回购 Azure/azure-event-hubs-python will completed move to GitHub repo Azure/azure-sdk-for-python, the changes for Offset class is renamed as EventPosition class 具有相同的属性 valueinclusive