Amazon MSK 默认配置和交易发布问题
Problems with Amazon MSK default configuration and publishing with transactions
最近我们开始对我们的 Kafka 连接器进行一些测试,以连接到亚马逊的托管 Kafka 服务 MSK。发布记录似乎工作正常,但是 不 当启用交易时。
我们的集群由 2 个使用默认 MSK 配置的代理组成(因为我们有 2 个区域)。我们正在使用以下属性创建 Java Kafka 生产者:
bootstrap.servers=x.us-east-1.amazonaws.com:9094,y.us-east-1.amazonaws.com:9094
client.id=kafkautil
max.block.ms=5000
request.timeout.ms=5000
security.protocol=SSL
transactional.id=transactions
然而,当生产者以启用事务的 transactional.id
设置启动时,initTransactions()
方法挂起:
producer = new KafkaProducer<Object, Object>(kafkaProperties);
if (kafkaProperties.containsKey(ProducerConfig.TRANSACTIONAL_ID_CONFIG)) {
// this hangs
producer.initTransactions();
}
查看日志输出,我们看到以下流,它似乎从未超时。
TransactionManager - Enqueuing transactional request (type=FindCoordinatorRequest,
coordinatorKey=y, coordinatorType=TRANSACTION)
TransactionManager - Request (type=FindCoordinatorRequest, coordinatorKey=y,
coordinatorType=TRANSACTION) dequeued for sending
NetworkClient - Found least loaded node z:9094 (id: -2 rack: null) connected with no
in-flight requests
Sender - Sending transactional request (type=FindCoordinatorRequest, coordinatorKey=y,
coordinatorType=TRANSACTION) to node z (id: -2 rack: null)
NetworkClient - Sending FIND_COORDINATOR {coordinator_key=y,coordinator_type=1} with
correlation id 424 to node -2
NetworkClient - Completed receive from node -2 for FIND_COORDINATOR with
correlation id 424, received {throttle_time_ms=0,error_code=15,error_message=null,
coordinator={node_id=-1,host=,port=-1}}
TransactionManager LogContext.java:129 - Received transactional response
FindCoordinatorResponse(throttleTimeMs=0, errorMessage='null',
error=COORDINATOR_NOT_AVAILABLE, node=:-1 (id: -1 rack: null)) for request
(type=FindCoordinatorRequest, coordinatorKey=xxx, coordinatorType=TRANSACTION)
据我所知,代理 可用 并且 bootstrap.servers
属性 中的每个主机都可用。如果我连接到它们中的每一个并在没有交易的情况下发布,那么它就可以工作。
知道我们缺少什么吗?
However when the producer was started with the transactional.id setting which enables transactions, the initTransactions() method hangs:
事实证明这是默认 AWS MSK 属性和代理数量的问题。如果您创建的 Kafka 集群少于 3 个代理,则需要调整以下设置。
以下设置应该设置(我认为)到经纪人的数量:
Property
Kafka
Default
AWS
Default
Description
default.replication.factor
1
3
Default replication factors for automatically created topics.
min.insync.replicas
1
2
Minimum number of replicas that must acknowledge a write for the write to be considered successful
offsets.topic.replication.factor
3
3
Internal topic that shares offsets on topics.
transaction.state.log.replication.factor
3
3
Replication factor for the transaction topic.
这是 Kafka docs on broker properties。
因为我们有 2 个经纪人,所以我们最终得到:
default.replication.factor=2
min.insync.replicas=2
offsets.topic.replication.factor=2
transaction.state.log.replication.factor=2
这似乎解决了问题。恕我直言,这是 AWS MSK 和默认配置的真正问题。他们需要自动生成默认配置并根据集群中代理的数量进行调整。
最近我们开始对我们的 Kafka 连接器进行一些测试,以连接到亚马逊的托管 Kafka 服务 MSK。发布记录似乎工作正常,但是 不 当启用交易时。
我们的集群由 2 个使用默认 MSK 配置的代理组成(因为我们有 2 个区域)。我们正在使用以下属性创建 Java Kafka 生产者:
bootstrap.servers=x.us-east-1.amazonaws.com:9094,y.us-east-1.amazonaws.com:9094
client.id=kafkautil
max.block.ms=5000
request.timeout.ms=5000
security.protocol=SSL
transactional.id=transactions
然而,当生产者以启用事务的 transactional.id
设置启动时,initTransactions()
方法挂起:
producer = new KafkaProducer<Object, Object>(kafkaProperties);
if (kafkaProperties.containsKey(ProducerConfig.TRANSACTIONAL_ID_CONFIG)) {
// this hangs
producer.initTransactions();
}
查看日志输出,我们看到以下流,它似乎从未超时。
TransactionManager - Enqueuing transactional request (type=FindCoordinatorRequest,
coordinatorKey=y, coordinatorType=TRANSACTION)
TransactionManager - Request (type=FindCoordinatorRequest, coordinatorKey=y,
coordinatorType=TRANSACTION) dequeued for sending
NetworkClient - Found least loaded node z:9094 (id: -2 rack: null) connected with no
in-flight requests
Sender - Sending transactional request (type=FindCoordinatorRequest, coordinatorKey=y,
coordinatorType=TRANSACTION) to node z (id: -2 rack: null)
NetworkClient - Sending FIND_COORDINATOR {coordinator_key=y,coordinator_type=1} with
correlation id 424 to node -2
NetworkClient - Completed receive from node -2 for FIND_COORDINATOR with
correlation id 424, received {throttle_time_ms=0,error_code=15,error_message=null,
coordinator={node_id=-1,host=,port=-1}}
TransactionManager LogContext.java:129 - Received transactional response
FindCoordinatorResponse(throttleTimeMs=0, errorMessage='null',
error=COORDINATOR_NOT_AVAILABLE, node=:-1 (id: -1 rack: null)) for request
(type=FindCoordinatorRequest, coordinatorKey=xxx, coordinatorType=TRANSACTION)
据我所知,代理 可用 并且 bootstrap.servers
属性 中的每个主机都可用。如果我连接到它们中的每一个并在没有交易的情况下发布,那么它就可以工作。
知道我们缺少什么吗?
However when the producer was started with the transactional.id setting which enables transactions, the initTransactions() method hangs:
事实证明这是默认 AWS MSK 属性和代理数量的问题。如果您创建的 Kafka 集群少于 3 个代理,则需要调整以下设置。
以下设置应该设置(我认为)到经纪人的数量:
Property | Kafka Default |
AWS Default |
Description |
---|---|---|---|
default.replication.factor | 1 | 3 | Default replication factors for automatically created topics. |
min.insync.replicas | 1 | 2 | Minimum number of replicas that must acknowledge a write for the write to be considered successful |
offsets.topic.replication.factor | 3 | 3 | Internal topic that shares offsets on topics. |
transaction.state.log.replication.factor | 3 | 3 | Replication factor for the transaction topic. |
这是 Kafka docs on broker properties。
因为我们有 2 个经纪人,所以我们最终得到:
default.replication.factor=2
min.insync.replicas=2
offsets.topic.replication.factor=2
transaction.state.log.replication.factor=2
这似乎解决了问题。恕我直言,这是 AWS MSK 和默认配置的真正问题。他们需要自动生成默认配置并根据集群中代理的数量进行调整。