Spring Kafka Producer:代理关闭时收到中间错误
Spring Kafka Producer: receive inmediate error when broker is down
我们有一个 http 端点,它接收一些数据并将其发送到 Kafka。如果代理关闭,我们希望立即响应错误,而不是异步重试。这可能吗?
我们正在做的是启动应用程序,关闭代理并发送消息以查看发生了什么。我们正在使用 here.
中描述的阻止选项发送消息
public void sendToKafka(final MyOutputData data) {
final ProducerRecord<String, String> record = createRecord(data);
try {
template.send(record).get(10, TimeUnit.SECONDS);
handleSuccess(data);
}
catch (ExecutionException e) {
handleFailure(data, record, e.getCause());
}
catch (TimeoutException | InterruptedException e) {
handleFailure(data, record, e);
}
}
当代理关闭时,我们收到以下警告:
[Producer clientId=producer-1] Connection to node 1 (localhost/127.0.0.1:9092) could not be established. Broker may not be available.
时间限制用完后,我们收到 TimeoutException 错误。但是我们想要做的是在尝试发送时立即捕获错误。我们配置了 retries=0.
我想了解当我们在代理关闭时向 Kafka 发送消息时会发生什么。
这是生产者配置:
acks = -1
batch.size = 16384
bootstrap.servers = [localhost:9092]
buffer.memory = 33554432
client.dns.lookup = use_all_dns_ips
client.id = producer-1
compression.type = none
connections.max.idle.ms = 540000
delivery.timeout.ms = 30000
enable.idempotence = true
interceptor.classes = []
key.serializer = class org.apache.kafka.common.serialization.UUIDSerializer
linger.ms = 0
max.block.ms = 60000
max.in.flight.requests.per.connection = 5
max.request.size = 1048576
metadata.max.age.ms = 300000
metadata.max.idle.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
receive.buffer.bytes = 32768
reconnect.backoff.max.ms = 50
reconnect.backoff.ms = 50
request.timeout.ms = 10000
retries = 0
retry.backoff.ms = 200
transaction.timeout.ms = 60000
producer.send
将数据放入内部队列,仅在生产者刷新时才发送给代理(这是调用 .get()
.
的效果)
如果您需要在调用.send
之前检测连接,那么您实际上需要事先建立连接,例如,使用 AdminClient.describeCluster
方法调用
我们有一个 http 端点,它接收一些数据并将其发送到 Kafka。如果代理关闭,我们希望立即响应错误,而不是异步重试。这可能吗?
我们正在做的是启动应用程序,关闭代理并发送消息以查看发生了什么。我们正在使用 here.
中描述的阻止选项发送消息public void sendToKafka(final MyOutputData data) {
final ProducerRecord<String, String> record = createRecord(data);
try {
template.send(record).get(10, TimeUnit.SECONDS);
handleSuccess(data);
}
catch (ExecutionException e) {
handleFailure(data, record, e.getCause());
}
catch (TimeoutException | InterruptedException e) {
handleFailure(data, record, e);
}
}
当代理关闭时,我们收到以下警告:
[Producer clientId=producer-1] Connection to node 1 (localhost/127.0.0.1:9092) could not be established. Broker may not be available.
时间限制用完后,我们收到 TimeoutException 错误。但是我们想要做的是在尝试发送时立即捕获错误。我们配置了 retries=0.
我想了解当我们在代理关闭时向 Kafka 发送消息时会发生什么。
这是生产者配置:
acks = -1
batch.size = 16384
bootstrap.servers = [localhost:9092]
buffer.memory = 33554432
client.dns.lookup = use_all_dns_ips
client.id = producer-1
compression.type = none
connections.max.idle.ms = 540000
delivery.timeout.ms = 30000
enable.idempotence = true
interceptor.classes = []
key.serializer = class org.apache.kafka.common.serialization.UUIDSerializer
linger.ms = 0
max.block.ms = 60000
max.in.flight.requests.per.connection = 5
max.request.size = 1048576
metadata.max.age.ms = 300000
metadata.max.idle.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
receive.buffer.bytes = 32768
reconnect.backoff.max.ms = 50
reconnect.backoff.ms = 50
request.timeout.ms = 10000
retries = 0
retry.backoff.ms = 200
transaction.timeout.ms = 60000
producer.send
将数据放入内部队列,仅在生产者刷新时才发送给代理(这是调用 .get()
.
如果您需要在调用.send
之前检测连接,那么您实际上需要事先建立连接,例如,使用 AdminClient.describeCluster
方法调用