Confluent Kafka Python 生产者未使用 ACKS= 所有配置生产

Confluent Kafka Python producer not producing with ACKS= all configuration

我有一些 python 代码将生成一个 kafka 主题,这在默认设置 acks=1 下工作正常但是当我更改为 acks=allacks=2 消息没有在主题中结束。该主题的 min.insync.replicas 配置设置为 2。 运行 代码后没有返回错误信息这是什么意思?集群中有 3 个代理。

这是代码

from confluent_kafka import Producer
from kafka.errors import KafkaError

def get_producer_config():
    return Producer(get_config())


def get_config():
    conf = {
        'bootstrap.servers': 'localhost:9092',
        'acks': '2',
    }

    return conf

try:
    producer = get_producer_config()
    producer.produce('test', 'test message from local app')
    producer.flush()
except KafkaError as error:
    get_logger().error(str(error))

这源于调试 kafka 生产者 lambda,我们在其中收到错误消息 KafkaError{code=NOT_ENOUGH_REPLICAS,val=19,str="Broker: Not enough in-sync replicas"} 我尝试在本地进行顶级复制,但没有收到错误,但在尝试时注意到了这一点。

谢谢

  1. 确认:2 - 不允许。

The number of acknowledgments the producer requires the leader to have received before considering a request complete. Allowed values: 0 for no acknowledgments, 1 for only the leader and -1 (or all) for the full ISR.

  1. 您必须检查主题级别配置。两个参数相关。
    min.insync.replicas(ISR) <= replication-factor

acks 只能有三个值:

  1. acks = 1:这是默认值,只有领导者将消息写入其日志,但会在不等待所有追随者的完全确认的情况下做出响应。
  2. acks = 0:生产者根本不会等待服务器的任何确认。该记录将立即添加到套接字缓冲区并视为已发送。
  3. acks =all:这意味着领导者将等待所有同步副本确认记录。这保证只要至少一个同步副本保持活动状态,记录就不会丢失。这是最有力的保证。

此外,请在 min.insyn.replica 中指定一个有效值,其中应包含编号。包括领导者在内的副本数量。

例如,如果您的复制因子是 4,并且您的 min.insync.replica = 3,那么它是 acks=all 的有效配置。