如何在 Kafka 中处理一次消息,以便重新启动时服务不会处理所有消息
How to process messages in Kafka once, so a Service when is restarted doesnt process all messages
第一次使用Kafka,我使用微服务架构学习Kafka,我正在寻找下一个问题。
每次我重新启动时,我的服务都在处理主题中的所有消息。有没有一种方法我只能处理这些消息一次,将它们标记为已读或其他什么?
这是我在 Pytho 3 中的代码片段:
class EmailStreamConsumer:
def __init__(self, bootstrap_servers='localhost:9092'):
self.__bootstrap_servers = bootstrap_servers
self.__new_emails_consumer = KafkaConsumer('NewEmails', bootstrap_servers=bootstrap_servers,
auto_offset_reset='earliest')
self.__sent_emails_consumer = KafkaConsumer('SentEmails', bootstrap_servers=bootstrap_servers,
auto_offset_reset='earliest')
def start(self):
for message in self.__new_emails_consumer:
value = message.value.decode('utf-8')
email = json.loads(value)
self.send_email(email['content'], email['to_email'], email['title'], email['from_email'])
print("%s:%d:%d: key=%s value=%s" % (
message.topic, message.partition, message.offset, message.key, message.value))
我希望该服务只发送一次电子邮件。即使重新启动服务。
如果消费者向卡夫卡确认它已经阅读了消息。那么我们就不会有这个问题了。
这可以通过两种方式完成。
方法 1:在我们收到消息后启用自动提交。
对于这种方法,我们需要添加值为 true 的 属性 enable.auto.commit。
方法 2:如果我们需要编程控制,我们可以使用 commitSync() 和 commitAsync()。
我认为你的问题是你的 Kafka-Consumer
没有 GROUP ID
只需添加:
String groupId = "kafka-new-emails";
properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId);
Your application will start read from the latest email as your
consumer group labeled where the last commit you read was. Also, if you have more than one consumer and one of them gets down, consumer group will help you in making a rebalance as to make the consumer that is online to read from the partition that was assigned to the consumer that is down.
第一次使用Kafka,我使用微服务架构学习Kafka,我正在寻找下一个问题。
每次我重新启动时,我的服务都在处理主题中的所有消息。有没有一种方法我只能处理这些消息一次,将它们标记为已读或其他什么?
这是我在 Pytho 3 中的代码片段:
class EmailStreamConsumer:
def __init__(self, bootstrap_servers='localhost:9092'):
self.__bootstrap_servers = bootstrap_servers
self.__new_emails_consumer = KafkaConsumer('NewEmails', bootstrap_servers=bootstrap_servers,
auto_offset_reset='earliest')
self.__sent_emails_consumer = KafkaConsumer('SentEmails', bootstrap_servers=bootstrap_servers,
auto_offset_reset='earliest')
def start(self):
for message in self.__new_emails_consumer:
value = message.value.decode('utf-8')
email = json.loads(value)
self.send_email(email['content'], email['to_email'], email['title'], email['from_email'])
print("%s:%d:%d: key=%s value=%s" % (
message.topic, message.partition, message.offset, message.key, message.value))
我希望该服务只发送一次电子邮件。即使重新启动服务。
如果消费者向卡夫卡确认它已经阅读了消息。那么我们就不会有这个问题了。
这可以通过两种方式完成。
方法 1:在我们收到消息后启用自动提交。
对于这种方法,我们需要添加值为 true 的 属性 enable.auto.commit。
方法 2:如果我们需要编程控制,我们可以使用 commitSync() 和 commitAsync()。
我认为你的问题是你的 Kafka-Consumer
GROUP ID
只需添加:
String groupId = "kafka-new-emails";
properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId);
Your application will start read from the latest email as your consumer group labeled where the last commit you read was. Also, if you have more than one consumer and one of them gets down, consumer group will help you in making a rebalance as to make the consumer that is online to read from the partition that was assigned to the consumer that is down.