如何使用kafka-python订阅多个kafka通配符模式的列表?
How to subscribe to a list of multiple kafka wildcard patterns using kafka-python?
我正在使用带有通配符的模式订阅 Kafka,如下所示。通配符表示动态客户 ID。
consumer.subscribe(pattern='customer.*.validations')
这很有效,因为我可以从主题字符串中提取客户 ID。但现在我需要扩展功能,以便出于稍微不同的目的收听类似的主题。我们称它为 customer.*.additional-validations
。代码需要存在于同一个项目中,因为共享了很多功能,但我需要能够根据队列类型采取不同的路径。
在 Kafka documentation 中,我可以看到可以订阅一系列主题。然而,这些是硬编码的字符串。不是允许灵活性的模式。
>>> # Deserialize msgpack-encoded values
>>> consumer = KafkaConsumer(value_deserializer=msgpack.loads)
>>> consumer.subscribe(['msgpackfoo'])
>>> for msg in consumer:
... assert isinstance(msg.value, dict)
所以我想知道是否有可能以某种方式将两者结合起来?有点像这样(不工作):
consumer.subscribe(pattern=['customer.*.validations', 'customer.*.additional-validations'])
在 KafkaConsumer 代码中,它支持主题列表或模式,
def subscribe(self, topics=(), pattern=None, listener=None):
"""Subscribe to a list of topics, or a topic regex pattern
Partitions will be dynamically assigned via a group coordinator.
Topic subscriptions are not incremental: this list will replace the
current assignment (if there is one).
因此您可以创建一个正则表达式,使用 |
或条件,它应该作为订阅多个动态主题正则表达式,因为它在内部使用 re
模块进行匹配。
(customer.*.validations)|(customer.*.additional-validations)
在 Confluent Kafka library 中,订阅没有 pattern
关键字,而是处理以 ^
.
开头的正则表达式模式
def subscribe(self, topics, on_assign=None, *args, **kwargs):
"""
Set subscription to a supplied list of topics
This replaces a previous subscription.
Regexp pattern subscriptions are supported by prefixing the topic string with ``"^"``, e.g.::
consumer.subscribe(["^my_topic.*", "^another[0-9]-?[a-z]+$", "not_a_regex"])
"""
我正在使用带有通配符的模式订阅 Kafka,如下所示。通配符表示动态客户 ID。
consumer.subscribe(pattern='customer.*.validations')
这很有效,因为我可以从主题字符串中提取客户 ID。但现在我需要扩展功能,以便出于稍微不同的目的收听类似的主题。我们称它为 customer.*.additional-validations
。代码需要存在于同一个项目中,因为共享了很多功能,但我需要能够根据队列类型采取不同的路径。
在 Kafka documentation 中,我可以看到可以订阅一系列主题。然而,这些是硬编码的字符串。不是允许灵活性的模式。
>>> # Deserialize msgpack-encoded values
>>> consumer = KafkaConsumer(value_deserializer=msgpack.loads)
>>> consumer.subscribe(['msgpackfoo'])
>>> for msg in consumer:
... assert isinstance(msg.value, dict)
所以我想知道是否有可能以某种方式将两者结合起来?有点像这样(不工作):
consumer.subscribe(pattern=['customer.*.validations', 'customer.*.additional-validations'])
在 KafkaConsumer 代码中,它支持主题列表或模式,
def subscribe(self, topics=(), pattern=None, listener=None):
"""Subscribe to a list of topics, or a topic regex pattern
Partitions will be dynamically assigned via a group coordinator.
Topic subscriptions are not incremental: this list will replace the
current assignment (if there is one).
因此您可以创建一个正则表达式,使用 |
或条件,它应该作为订阅多个动态主题正则表达式,因为它在内部使用 re
模块进行匹配。
(customer.*.validations)|(customer.*.additional-validations)
在 Confluent Kafka library 中,订阅没有 pattern
关键字,而是处理以 ^
.
def subscribe(self, topics, on_assign=None, *args, **kwargs):
"""
Set subscription to a supplied list of topics
This replaces a previous subscription.
Regexp pattern subscriptions are supported by prefixing the topic string with ``"^"``, e.g.::
consumer.subscribe(["^my_topic.*", "^another[0-9]-?[a-z]+$", "not_a_regex"])
"""