exactly once delivery 是否可以通过 spring-cloud-stream-binder-kafka 或 spring-kafka 使用哪个
exactly once delivery Is it possible through spring-cloud-stream-binder-kafka or spring-kafka which one to use
我正在尝试在 spring 启动应用程序中使用 spring-cloud-stream-binder-kafka 实现一次交付。
我使用的版本是:
- spring-cloud-stream-binder-kafka-core-1.2.1.RELEASE
- spring-cloud-stream-binder-kafka-1.2.1.RELEASE
- spring-cloud-stream-codec-1.2.2.RELEASE spring-kafka-1.1.6.RELEASE
- spring-integration-kafka-2.1.0.RELEASE
- spring-integration-core-4.3.10.RELEASE
- zookeeper-3.4.8
- 卡夫卡版本:0.10.1.1
这是我的配置(云配置):
spring:
autoconfigure:
exclude: org.springframework.cloud.netflix.metrics.servo.ServoMetricsAutoConfiguration
kafka:
consumer:
enable-auto-commit: false
cloud:
stream:
kafka:
binder:
brokers: "${BROKER_HOST:xyz-aws.local:9092}"
headers:
- X-B3-TraceId
- X-B3-SpanId
- X-B3-Sampled
- X-B3-ParentSpanId
- X-Span-Name
- X-Process-Id
zkNodes: "${ZOOKEEPER_HOST:120.211.316.261:2181,120.211.317.252:2181}"
bindings:
feed_platform_events_input:
consumer:
autoCommitOffset: false
binders:
xyzkafka:
type: kafka
bindings:
feed_platform_events_input:
binder: xyzkafka
destination: platform-events
group: br-platform-events
我有两个主要的 classes:
FeedSink 接口:
package au.com.xyz.proxy.interfaces;
import org.springframework.cloud.stream.annotation.Input;
import org.springframework.messaging.MessageChannel;
public interface FeedSink {
String FEED_PLATFORM_EVENTS_INPUT = "feed_platform_events_input";
@Input(FeedSink.FEED_PLATFORM_EVENTS_INPUT)
MessageChannel feedlatformEventsInput();
}
EventConsumer
package au.com.xyz.proxy.consumer;
@Slf4j
@EnableBinding(FeedSink.class)
public class EventConsumer {
public static final String SUCCESS_MESSAGE =
"SEND-SUCCESS : Successfully sent message to platform.";
public static final String FAULT_MESSAGE = "SOAP-FAULT Code: {}, Description: {}";
public static final String CONNECT_ERROR_MESSAGE = "CONNECT-ERROR Error Details: {}";
public static final String EMPTY_NOTIFICATION_ERROR_MESSAGE =
"EMPTY-NOTIFICATION-ERROR Empty Event Received from platform";
@Autowired
private CapPointService service;
@StreamListener(FeedSink.FEED_PLATFORM_EVENTS_INPUT)
/**
* method associated with stream to process message.
*/
public void message(final @Payload EventNotification eventNotification,
final @Header(KafkaHeaders.ACKNOWLEDGMENT) Acknowledgment acknowledgment) {
String caseMilestone = "UNKNOWN";
if (!ObjectUtils.isEmpty(eventNotification)) {
SysMessage sysMessage = processPayload(eventNotification);
caseMilestone = sysMessage.getCaseMilestone();
try {
ClientResponse response = service.sendPayload(sysMessage);
if (response.hasFault()) {
Fault faultDetails = response.getFaultDetails();
log.error(FAULT_MESSAGE, faultDetails.getCode(), faultDetails.getDescription());
} else {
log.info(SUCCESS_MESSAGE);
}
acknowledgment.acknowledge();
} catch (Exception e) {
log.error(CONNECT_ERROR_MESSAGE, e.getMessage());
}
} else {
log.error(EMPTY_NOTIFICATION_ERROR_MESSAGE);
acknowledgment.acknowledge();
}
}
private SysMessage processPayload(final EventNotification eventNotification) {
Gson gson = new Gson();
String jsonString = gson.toJson(eventNotification.getData());
log.info("Consumed message for platform events with payload : {} ", jsonString);
SysMessage sysMessage = gson.fromJson(jsonString, SysMessage.class);
return sysMessage;
}
}
我已将 Kafka 和 spring 容器的自动提交 属性 设置为 false。
如果您在 EventConsumer class 中看到我在 service.sendPayload 成功并且没有异常的情况下使用了 Acknowledge。我希望容器移动偏移量并轮询下一条记录。
我观察到的是:
场景 1 - 如果抛出异常并且没有在 kafka 上发布新消息。没有重试处理消息,似乎没有activity。即使解决了根本问题。我指的问题是下游服务器不可用。有没有办法重试处理n次然后放弃。请注意,这是从上次提交的偏移量重试处理或重新轮询。这与 Kafka 实例不可用无关。
如果我重新启动服务(EC2 实例),那么处理将从最后一次成功确认完成的偏移量开始。
场景 2 - 如果发生异常,然后将后续消息推送到 kafka。我看到新消息已处理并且偏移量已移动。这意味着我丢失了未确认的消息。所以问题是我是否已经处理了确认。我如何控制从上次提交读取而不仅仅是最新消息并处理它。我假设内部正在进行民意调查,但它没有考虑或不知道最后一条消息未被确认。我不认为有多个线程从 kafka 读取。我不知道@Input 和@StreamListener 注释是如何控制的。我假设线程由控制线程的 属性 consumer.concurrency 控制,默认情况下它设置为 1.
所以我做了研究并找到了很多链接,但不幸的是 none 其中回答了我的具体问题。
我看了 (https://github.com/spring-cloud/spring-cloud-stream/issues/575)
其中有来自 Marius (https://whosebug.com/users/809122/marius-bogoevici) 的评论:
Do note that Kafka does not provide individual message acking, which
means that acknowledgment translates into updating the latest consumed
offset to the offset of the acked message (per topic/partition). That
means that if you're acking messages from the same topic partition out
of order, a message can 'ack' all the messages before it.
不确定是不是单线程时的顺序问题
很抱歉 post,但我想提供足够的信息。最主要的是我试图避免在从 kafka 消费时丢失消息,我想看看 spring-cloud-stream-binder-kafka 是否可以完成这项工作,或者我必须寻找替代方案。
2018 年 7 月 6 日更新
我看到了这个posthttps://github.com/spring-projects/spring-kafka/issues/431
这是解决我的问题的更好方法吗?我可以试试最新版本的 spring-kafka
@KafkaListener(id = "qux", topics = "annotated4", containerFactory = "kafkaManualAckListenerContainerFactory",
containerGroup = "quxGroup")
public void listen4(@Payload String foo, Acknowledgment ack, Consumer<?, ?> consumer) {
- 这是否有助于控制偏移量设置到最后一个位置
成功处理记录?我怎样才能从聆听中做到这一点
方法。 consumer.seekToEnd();然后监听方法将如何重置以获取该记录?
- 将Consumer放在签名中是否支持获取
处理消费者?或者我需要做更多的事情?
- 我应该使用 Acknowledge 还是 consumer.commitSyncy()
- containerFactory有什么意义。我必须定义它吗
作为一颗豆子。
- 我需要@EnableKafka 和@Configuration 才能使上述方法起作用吗?
请记住,该应用程序是 Spring 启动应用程序。
- 通过将消费者添加到监听方法我不需要实现
ConsumerAware 界面?
最后但同样重要的是,如果可行,是否可以提供上述方法的一些示例。
2018 年 7 月 12 日更新
感谢 Gary (https://whosebug.com/users/1240763/gary-russell) 提供使用 maxAttempts 的提示。我用过这种方法。并且我能够做到exactly once delivery并且保持消息的顺序。
我更新的云配置:
spring:
autoconfigure:
exclude: org.springframework.cloud.netflix.metrics.servo.ServoMetricsAutoConfiguration
kafka:
consumer:
enable-auto-commit: false
cloud:
stream:
kafka:
binder:
brokers: "${BROKER_HOST:xyz-aws.local:9092}"
headers:
- X-B3-TraceId
- X-B3-SpanId
- X-B3-Sampled
- X-B3-ParentSpanId
- X-Span-Name
- X-Process-Id
zkNodes: "${ZOOKEEPER_HOST:120.211.316.261:2181,120.211.317.252:2181}"
bindings:
feed_platform_events_input:
consumer:
autoCommitOffset: false
binders:
xyzkafka:
type: kafka
bindings:
feed_platform_events_input:
binder: xyzkafka
destination: platform-events
group: br-platform-events
consumer:
maxAttempts: 2147483647
backOffInitialInterval: 1000
backOffMaxInterval: 300000
backOffMultiplier: 2.0
Event Consumer 与我的初始实现相同。除了重新抛出容器知道处理失败的错误。如果您只是捕获它,那么容器就无法知道消息处理失败。通过执行 acknoweldgement.acknowledge 你只是控制偏移提交。为了重试发生,您必须抛出异常。不要忘记将 kafka 客户端自动提交 属性 和 spring(容器级别)autocommitOffset 属性 设置为 false。就是这样。
正如 Marius 所解释的,Kafka 只在日志中维护一个偏移量。如果处理下一条消息,并更新偏移量;失败的消息丢失。
您可以将失败的消息发送到死信主题(将 enableDlq
设置为 true)。
最新版本的 Spring Kafka (2.1.x) 有特殊的错误处理程序 ContainerStoppingErrorHandler
会在发生异常时停止容器, SeekToCurrentErrorHandler
会导致重新传递失败的消息。
我正在尝试在 spring 启动应用程序中使用 spring-cloud-stream-binder-kafka 实现一次交付。 我使用的版本是:
- spring-cloud-stream-binder-kafka-core-1.2.1.RELEASE
- spring-cloud-stream-binder-kafka-1.2.1.RELEASE
- spring-cloud-stream-codec-1.2.2.RELEASE spring-kafka-1.1.6.RELEASE
- spring-integration-kafka-2.1.0.RELEASE
- spring-integration-core-4.3.10.RELEASE
- zookeeper-3.4.8
- 卡夫卡版本:0.10.1.1
这是我的配置(云配置):
spring:
autoconfigure:
exclude: org.springframework.cloud.netflix.metrics.servo.ServoMetricsAutoConfiguration
kafka:
consumer:
enable-auto-commit: false
cloud:
stream:
kafka:
binder:
brokers: "${BROKER_HOST:xyz-aws.local:9092}"
headers:
- X-B3-TraceId
- X-B3-SpanId
- X-B3-Sampled
- X-B3-ParentSpanId
- X-Span-Name
- X-Process-Id
zkNodes: "${ZOOKEEPER_HOST:120.211.316.261:2181,120.211.317.252:2181}"
bindings:
feed_platform_events_input:
consumer:
autoCommitOffset: false
binders:
xyzkafka:
type: kafka
bindings:
feed_platform_events_input:
binder: xyzkafka
destination: platform-events
group: br-platform-events
我有两个主要的 classes: FeedSink 接口:
package au.com.xyz.proxy.interfaces;
import org.springframework.cloud.stream.annotation.Input;
import org.springframework.messaging.MessageChannel;
public interface FeedSink {
String FEED_PLATFORM_EVENTS_INPUT = "feed_platform_events_input";
@Input(FeedSink.FEED_PLATFORM_EVENTS_INPUT)
MessageChannel feedlatformEventsInput();
}
EventConsumer
package au.com.xyz.proxy.consumer;
@Slf4j
@EnableBinding(FeedSink.class)
public class EventConsumer {
public static final String SUCCESS_MESSAGE =
"SEND-SUCCESS : Successfully sent message to platform.";
public static final String FAULT_MESSAGE = "SOAP-FAULT Code: {}, Description: {}";
public static final String CONNECT_ERROR_MESSAGE = "CONNECT-ERROR Error Details: {}";
public static final String EMPTY_NOTIFICATION_ERROR_MESSAGE =
"EMPTY-NOTIFICATION-ERROR Empty Event Received from platform";
@Autowired
private CapPointService service;
@StreamListener(FeedSink.FEED_PLATFORM_EVENTS_INPUT)
/**
* method associated with stream to process message.
*/
public void message(final @Payload EventNotification eventNotification,
final @Header(KafkaHeaders.ACKNOWLEDGMENT) Acknowledgment acknowledgment) {
String caseMilestone = "UNKNOWN";
if (!ObjectUtils.isEmpty(eventNotification)) {
SysMessage sysMessage = processPayload(eventNotification);
caseMilestone = sysMessage.getCaseMilestone();
try {
ClientResponse response = service.sendPayload(sysMessage);
if (response.hasFault()) {
Fault faultDetails = response.getFaultDetails();
log.error(FAULT_MESSAGE, faultDetails.getCode(), faultDetails.getDescription());
} else {
log.info(SUCCESS_MESSAGE);
}
acknowledgment.acknowledge();
} catch (Exception e) {
log.error(CONNECT_ERROR_MESSAGE, e.getMessage());
}
} else {
log.error(EMPTY_NOTIFICATION_ERROR_MESSAGE);
acknowledgment.acknowledge();
}
}
private SysMessage processPayload(final EventNotification eventNotification) {
Gson gson = new Gson();
String jsonString = gson.toJson(eventNotification.getData());
log.info("Consumed message for platform events with payload : {} ", jsonString);
SysMessage sysMessage = gson.fromJson(jsonString, SysMessage.class);
return sysMessage;
}
}
我已将 Kafka 和 spring 容器的自动提交 属性 设置为 false。 如果您在 EventConsumer class 中看到我在 service.sendPayload 成功并且没有异常的情况下使用了 Acknowledge。我希望容器移动偏移量并轮询下一条记录。 我观察到的是:
场景 1 - 如果抛出异常并且没有在 kafka 上发布新消息。没有重试处理消息,似乎没有activity。即使解决了根本问题。我指的问题是下游服务器不可用。有没有办法重试处理n次然后放弃。请注意,这是从上次提交的偏移量重试处理或重新轮询。这与 Kafka 实例不可用无关。 如果我重新启动服务(EC2 实例),那么处理将从最后一次成功确认完成的偏移量开始。
场景 2 - 如果发生异常,然后将后续消息推送到 kafka。我看到新消息已处理并且偏移量已移动。这意味着我丢失了未确认的消息。所以问题是我是否已经处理了确认。我如何控制从上次提交读取而不仅仅是最新消息并处理它。我假设内部正在进行民意调查,但它没有考虑或不知道最后一条消息未被确认。我不认为有多个线程从 kafka 读取。我不知道@Input 和@StreamListener 注释是如何控制的。我假设线程由控制线程的 属性 consumer.concurrency 控制,默认情况下它设置为 1.
所以我做了研究并找到了很多链接,但不幸的是 none 其中回答了我的具体问题。 我看了 (https://github.com/spring-cloud/spring-cloud-stream/issues/575) 其中有来自 Marius (https://whosebug.com/users/809122/marius-bogoevici) 的评论:
Do note that Kafka does not provide individual message acking, which means that acknowledgment translates into updating the latest consumed offset to the offset of the acked message (per topic/partition). That means that if you're acking messages from the same topic partition out of order, a message can 'ack' all the messages before it.
不确定是不是单线程时的顺序问题
很抱歉 post,但我想提供足够的信息。最主要的是我试图避免在从 kafka 消费时丢失消息,我想看看 spring-cloud-stream-binder-kafka 是否可以完成这项工作,或者我必须寻找替代方案。
2018 年 7 月 6 日更新
我看到了这个posthttps://github.com/spring-projects/spring-kafka/issues/431 这是解决我的问题的更好方法吗?我可以试试最新版本的 spring-kafka
@KafkaListener(id = "qux", topics = "annotated4", containerFactory = "kafkaManualAckListenerContainerFactory",
containerGroup = "quxGroup")
public void listen4(@Payload String foo, Acknowledgment ack, Consumer<?, ?> consumer) {
- 这是否有助于控制偏移量设置到最后一个位置 成功处理记录?我怎样才能从聆听中做到这一点 方法。 consumer.seekToEnd();然后监听方法将如何重置以获取该记录?
- 将Consumer放在签名中是否支持获取 处理消费者?或者我需要做更多的事情?
- 我应该使用 Acknowledge 还是 consumer.commitSyncy()
- containerFactory有什么意义。我必须定义它吗 作为一颗豆子。
- 我需要@EnableKafka 和@Configuration 才能使上述方法起作用吗? 请记住,该应用程序是 Spring 启动应用程序。
- 通过将消费者添加到监听方法我不需要实现 ConsumerAware 界面?
最后但同样重要的是,如果可行,是否可以提供上述方法的一些示例。
2018 年 7 月 12 日更新
感谢 Gary (https://whosebug.com/users/1240763/gary-russell) 提供使用 maxAttempts 的提示。我用过这种方法。并且我能够做到exactly once delivery并且保持消息的顺序。
我更新的云配置:
spring:
autoconfigure:
exclude: org.springframework.cloud.netflix.metrics.servo.ServoMetricsAutoConfiguration
kafka:
consumer:
enable-auto-commit: false
cloud:
stream:
kafka:
binder:
brokers: "${BROKER_HOST:xyz-aws.local:9092}"
headers:
- X-B3-TraceId
- X-B3-SpanId
- X-B3-Sampled
- X-B3-ParentSpanId
- X-Span-Name
- X-Process-Id
zkNodes: "${ZOOKEEPER_HOST:120.211.316.261:2181,120.211.317.252:2181}"
bindings:
feed_platform_events_input:
consumer:
autoCommitOffset: false
binders:
xyzkafka:
type: kafka
bindings:
feed_platform_events_input:
binder: xyzkafka
destination: platform-events
group: br-platform-events
consumer:
maxAttempts: 2147483647
backOffInitialInterval: 1000
backOffMaxInterval: 300000
backOffMultiplier: 2.0
Event Consumer 与我的初始实现相同。除了重新抛出容器知道处理失败的错误。如果您只是捕获它,那么容器就无法知道消息处理失败。通过执行 acknoweldgement.acknowledge 你只是控制偏移提交。为了重试发生,您必须抛出异常。不要忘记将 kafka 客户端自动提交 属性 和 spring(容器级别)autocommitOffset 属性 设置为 false。就是这样。
正如 Marius 所解释的,Kafka 只在日志中维护一个偏移量。如果处理下一条消息,并更新偏移量;失败的消息丢失。
您可以将失败的消息发送到死信主题(将 enableDlq
设置为 true)。
最新版本的 Spring Kafka (2.1.x) 有特殊的错误处理程序 ContainerStoppingErrorHandler
会在发生异常时停止容器, SeekToCurrentErrorHandler
会导致重新传递失败的消息。