kafka消费者如何读取和处理高优先级消息?
How to read and process high priority messages in kafka consumer?
有什么方法可以优先处理高优先级的消息吗?
我尝试创建三个主题 'high'、'medium' 和 'low',并用一个消费者订阅了所有三个主题,如果 [=17] 中有未处理的消息=] 主题它会暂停其他两个。有没有更好的实现消息优先级的方法?
我尝试使用下面给出的逻辑。
topics = ['high', 'medium', 'low']
consumer.subscribe(topics)
high_topic_partition = TopicPartition(priority['high'], 0)
medium_topic_partition = TopicPartition(priority['medium'], 0)
low_topic_partition = TopicPartition(priority['low'], 0)
while True:
messages = consumer.poll()
high_priotity_unprocessed_msg = consumer.end_offsets([high_topic_partition])[high_topic_partition] - consumer.position(high_topic_partition)
medium_priotity_unprocessed_msg = consumer.end_offsets([medium_topic_partition])[medium_topic_partition] - consumer.position(medium_topic_partition)
low_priotity_unprocessed_msg = consumer.end_offsets([low_topic_partition])[low_topic_partition] - consumer.position(low_topic_partition)
if high_priotity_unprocessed_msg >0:
consumer.pause(medium_topic_partition)
consumer.pause(low_topic_partition)
else:
consumer.resume(medium_topic_partition)
if medium_priotity_unprocessed_msg >0:
consumer.pause(low_topic_partition)
else:
consumer.resume(low_topic_partition)
if messages:
process(messages)
您可以评估的一个选项基本上只是对更高优先级的消息具有更多的并行性...
例如:
Topic1 (Priority Low): 1 partitions
Topic2 (Priority medium): 5 partitions
Topic3 (Priority High): 20 partitions
然后基本上有:
- 1 消费者从 topic1
获取数据
- 5 位消费者来自 topic2
- 20 位消费者来自 topic3
现在,我建议您执行此操作的最简单方法基本上是编写一次代码...但是将“主题名称”的配置外部化...然后将其扩展(当然使用容器)...请参考这篇阅读:
例如,代码可以像这样简单:
SuperAwesomeAppBinaryCode:
topic = %MY_TOPIC_NAME_INJECTED_BY_ENV_VAR%
consumer.subscribe(topic)
while True:
messages = consumer.poll()
if messages:
process(messages)
现在,如果我们将该代码部署在 K8s 上,您可以有 3 种不同的部署,运行 相同的代码,但为每种情况注入正确的主题,例如:
低优先级邮件
apiVersion: apps/v1
kind: Deployment
metadata:
name: LowPriorityProcessor
labels:
app: LowPriorityProcessor
spec:
replicas: 1
selector:
matchLabels:
app: LowPriorityProcessor
template:
metadata:
labels:
app: LowPriorityProcessor
spec:
containers:
- name: LowPriorityProcessor
image: SuperAwesomeAppBinaryCode:1.0.0
env:
- name: MY_TOPIC_NAME_INJECTED_BY_ENV_VAR
value: topic1
ports:
- containerPort: 80
中优先级邮件
apiVersion: apps/v1
kind: Deployment
metadata:
name: MediumPriorityProcessor
labels:
app: MediumPriorityProcessor
spec:
replicas: 5
selector:
matchLabels:
app: MediumPriorityProcessor
template:
metadata:
labels:
app: MediumPriorityProcessor
spec:
containers:
- name: MediumPriorityProcessor
image: SuperAwesomeAppBinaryCode:1.0.0
env:
- name: MY_TOPIC_NAME_INJECTED_BY_ENV_VAR
value: topic2
ports:
- containerPort: 80
高优先级邮件
apiVersion: apps/v1
kind: Deployment
metadata:
name: HighPriorityProcessor
labels:
app: HighPriorityProcessor
spec:
replicas: 20
selector:
matchLabels:
app: HighPriorityProcessor
template:
metadata:
labels:
app: HighPriorityProcessor
spec:
containers:
- name: HighPriorityProcessor
image: SuperAwesomeAppBinaryCode:1.0.0
env:
- name: MY_TOPIC_NAME_INJECTED_BY_ENV_VAR
value: topic3
ports:
- containerPort: 80
然后让并行性发挥它的魔力
如果您仔细检查,从一个“k8s 部署”到另一个“k8s 部署”的唯一变化就是主题和副本数。
备注:
- 你可以在没有 K8s 的情况下实现这一点....使用 Docker Swarm 甚至只是 docker-compose 或 运行 手动实例 ♂️,但你为什么要重新发明wheel,但肯定在某些边缘情况下,没有太多选择...
- 可以找到有关此主题的精彩读物 here
有什么方法可以优先处理高优先级的消息吗?
我尝试创建三个主题 'high'、'medium' 和 'low',并用一个消费者订阅了所有三个主题,如果 [=17] 中有未处理的消息=] 主题它会暂停其他两个。有没有更好的实现消息优先级的方法?
我尝试使用下面给出的逻辑。
topics = ['high', 'medium', 'low']
consumer.subscribe(topics)
high_topic_partition = TopicPartition(priority['high'], 0)
medium_topic_partition = TopicPartition(priority['medium'], 0)
low_topic_partition = TopicPartition(priority['low'], 0)
while True:
messages = consumer.poll()
high_priotity_unprocessed_msg = consumer.end_offsets([high_topic_partition])[high_topic_partition] - consumer.position(high_topic_partition)
medium_priotity_unprocessed_msg = consumer.end_offsets([medium_topic_partition])[medium_topic_partition] - consumer.position(medium_topic_partition)
low_priotity_unprocessed_msg = consumer.end_offsets([low_topic_partition])[low_topic_partition] - consumer.position(low_topic_partition)
if high_priotity_unprocessed_msg >0:
consumer.pause(medium_topic_partition)
consumer.pause(low_topic_partition)
else:
consumer.resume(medium_topic_partition)
if medium_priotity_unprocessed_msg >0:
consumer.pause(low_topic_partition)
else:
consumer.resume(low_topic_partition)
if messages:
process(messages)
您可以评估的一个选项基本上只是对更高优先级的消息具有更多的并行性...
例如:
Topic1 (Priority Low): 1 partitions
Topic2 (Priority medium): 5 partitions
Topic3 (Priority High): 20 partitions
然后基本上有:
- 1 消费者从 topic1 获取数据
- 5 位消费者来自 topic2
- 20 位消费者来自 topic3
现在,我建议您执行此操作的最简单方法基本上是编写一次代码...但是将“主题名称”的配置外部化...然后将其扩展(当然使用容器)...请参考这篇阅读:
例如,代码可以像这样简单:
SuperAwesomeAppBinaryCode:
topic = %MY_TOPIC_NAME_INJECTED_BY_ENV_VAR%
consumer.subscribe(topic)
while True:
messages = consumer.poll()
if messages:
process(messages)
现在,如果我们将该代码部署在 K8s 上,您可以有 3 种不同的部署,运行 相同的代码,但为每种情况注入正确的主题,例如:
低优先级邮件
apiVersion: apps/v1
kind: Deployment
metadata:
name: LowPriorityProcessor
labels:
app: LowPriorityProcessor
spec:
replicas: 1
selector:
matchLabels:
app: LowPriorityProcessor
template:
metadata:
labels:
app: LowPriorityProcessor
spec:
containers:
- name: LowPriorityProcessor
image: SuperAwesomeAppBinaryCode:1.0.0
env:
- name: MY_TOPIC_NAME_INJECTED_BY_ENV_VAR
value: topic1
ports:
- containerPort: 80
中优先级邮件
apiVersion: apps/v1
kind: Deployment
metadata:
name: MediumPriorityProcessor
labels:
app: MediumPriorityProcessor
spec:
replicas: 5
selector:
matchLabels:
app: MediumPriorityProcessor
template:
metadata:
labels:
app: MediumPriorityProcessor
spec:
containers:
- name: MediumPriorityProcessor
image: SuperAwesomeAppBinaryCode:1.0.0
env:
- name: MY_TOPIC_NAME_INJECTED_BY_ENV_VAR
value: topic2
ports:
- containerPort: 80
高优先级邮件
apiVersion: apps/v1
kind: Deployment
metadata:
name: HighPriorityProcessor
labels:
app: HighPriorityProcessor
spec:
replicas: 20
selector:
matchLabels:
app: HighPriorityProcessor
template:
metadata:
labels:
app: HighPriorityProcessor
spec:
containers:
- name: HighPriorityProcessor
image: SuperAwesomeAppBinaryCode:1.0.0
env:
- name: MY_TOPIC_NAME_INJECTED_BY_ENV_VAR
value: topic3
ports:
- containerPort: 80
然后让并行性发挥它的魔力 如果您仔细检查,从一个“k8s 部署”到另一个“k8s 部署”的唯一变化就是主题和副本数。
备注:
- 你可以在没有 K8s 的情况下实现这一点....使用 Docker Swarm 甚至只是 docker-compose 或 运行 手动实例 ♂️,但你为什么要重新发明wheel,但肯定在某些边缘情况下,没有太多选择...
- 可以找到有关此主题的精彩读物 here