exactly once delivery 是否可以通过 spring-cloud-stream-binder-kafka 或 spring-kafka 使用哪个

exactly once delivery Is it possible through spring-cloud-stream-binder-kafka or spring-kafka which one to use

我正在尝试在 spring 启动应用程序中使用 spring-cloud-stream-binder-kafka 实现一次交付。 我使用的版本是:

这是我的配置(云配置):

    spring:
      autoconfigure:
        exclude: org.springframework.cloud.netflix.metrics.servo.ServoMetricsAutoConfiguration
      kafka:
        consumer:
          enable-auto-commit: false
      cloud:
        stream:
          kafka:
            binder:
              brokers: "${BROKER_HOST:xyz-aws.local:9092}"
              headers:
                - X-B3-TraceId
                - X-B3-SpanId
                - X-B3-Sampled
                - X-B3-ParentSpanId
                - X-Span-Name
                - X-Process-Id
              zkNodes: "${ZOOKEEPER_HOST:120.211.316.261:2181,120.211.317.252:2181}"
            bindings:
              feed_platform_events_input:
                consumer:
                  autoCommitOffset: false
          binders:
            xyzkafka:
              type: kafka
          bindings:
            feed_platform_events_input:
              binder: xyzkafka
              destination: platform-events
              group: br-platform-events

我有两个主要的 classes: FeedSink 接口:

package au.com.xyz.proxy.interfaces;
import org.springframework.cloud.stream.annotation.Input; 
import org.springframework.messaging.MessageChannel;

public interface FeedSink {

String FEED_PLATFORM_EVENTS_INPUT = "feed_platform_events_input";

@Input(FeedSink.FEED_PLATFORM_EVENTS_INPUT)
MessageChannel feedlatformEventsInput();
} 

EventConsumer

package au.com.xyz.proxy.consumer;

@Slf4j
@EnableBinding(FeedSink.class)
public class EventConsumer {

    public static final String SUCCESS_MESSAGE =
            "SEND-SUCCESS : Successfully sent message to platform.";
    public static final String FAULT_MESSAGE = "SOAP-FAULT Code: {}, Description: {}";
    public static final String CONNECT_ERROR_MESSAGE = "CONNECT-ERROR Error Details: {}";
    public static final String EMPTY_NOTIFICATION_ERROR_MESSAGE =
            "EMPTY-NOTIFICATION-ERROR Empty Event Received from platform";

    @Autowired
    private CapPointService service;

    @StreamListener(FeedSink.FEED_PLATFORM_EVENTS_INPUT)
    /**
     * method associated with stream to process message.
     */
    public void message(final @Payload EventNotification eventNotification,
                        final @Header(KafkaHeaders.ACKNOWLEDGMENT) Acknowledgment acknowledgment) {

        String caseMilestone = "UNKNOWN";
        if (!ObjectUtils.isEmpty(eventNotification)) {
            SysMessage sysMessage = processPayload(eventNotification);
            caseMilestone = sysMessage.getCaseMilestone();
            try {
                ClientResponse response = service.sendPayload(sysMessage);
                if (response.hasFault()) {
                    Fault faultDetails = response.getFaultDetails();
                    log.error(FAULT_MESSAGE, faultDetails.getCode(), faultDetails.getDescription());
                } else {
                    log.info(SUCCESS_MESSAGE);
                }
                acknowledgment.acknowledge();
            } catch (Exception e) {
                log.error(CONNECT_ERROR_MESSAGE, e.getMessage());
            }
        } else {
            log.error(EMPTY_NOTIFICATION_ERROR_MESSAGE);
            acknowledgment.acknowledge();
        }
    }



    private SysMessage processPayload(final EventNotification eventNotification) {
        Gson gson = new Gson();
        String jsonString =  gson.toJson(eventNotification.getData());
        log.info("Consumed message for platform events with payload : {} ", jsonString);
        SysMessage sysMessage = gson.fromJson(jsonString, SysMessage.class);
        return sysMessage;
    }
    }

我已将 Kafka 和 spring 容器的自动提交 属性 设置为 false。 如果您在 EventConsumer class 中看到我在 service.sendPayload 成功并且没有异常的情况下使用了 Acknowledge。我希望容器移动偏移量并轮询下一条记录。 我观察到的是:

所以我做了研究并找到了很多链接,但不幸的是 none 其中回答了我的具体问题。 我看了 (https://github.com/spring-cloud/spring-cloud-stream/issues/575) 其中有来自 Marius (https://whosebug.com/users/809122/marius-bogoevici) 的评论:

Do note that Kafka does not provide individual message acking, which means that acknowledgment translates into updating the latest consumed offset to the offset of the acked message (per topic/partition). That means that if you're acking messages from the same topic partition out of order, a message can 'ack' all the messages before it.

不确定是不是单线程时的顺序问题

很抱歉 post,但我想提供足够的信息。最主要的是我试图避免在从 kafka 消费时丢失消息,我想看看 spring-cloud-stream-binder-kafka 是否可以完成这项工作,或者我必须寻找替代方案。

2018 年 7 月 6 日更新

我看到了这个posthttps://github.com/spring-projects/spring-kafka/issues/431 这是解决我的问题的更好方法吗?我可以试试最新版本的 spring-kafka

@KafkaListener(id = "qux", topics = "annotated4", containerFactory = "kafkaManualAckListenerContainerFactory",
                containerGroup = "quxGroup")
public void listen4(@Payload String foo, Acknowledgment ack, Consumer<?, ?> consumer) {

最后但同样重要的是,如果可行,是否可以提供上述方法的一些示例。

2018 年 7 月 12 日更新

感谢 Gary (https://whosebug.com/users/1240763/gary-russell) 提供使用 maxAttempts 的提示。我用过这种方法。并且我能够做到exactly once delivery并且保持消息的顺序。

我更新的云配置:

    spring:
      autoconfigure:
        exclude: org.springframework.cloud.netflix.metrics.servo.ServoMetricsAutoConfiguration
      kafka:
        consumer:
          enable-auto-commit: false
      cloud:
        stream:
          kafka:
            binder:
              brokers: "${BROKER_HOST:xyz-aws.local:9092}"
              headers:
                - X-B3-TraceId
                - X-B3-SpanId
                - X-B3-Sampled
                - X-B3-ParentSpanId
                - X-Span-Name
                - X-Process-Id
              zkNodes: "${ZOOKEEPER_HOST:120.211.316.261:2181,120.211.317.252:2181}"
            bindings:
              feed_platform_events_input:
                consumer:
                  autoCommitOffset: false
          binders:
            xyzkafka:
              type: kafka
          bindings:
            feed_platform_events_input:
              binder: xyzkafka
              destination: platform-events
              group: br-platform-events
              consumer:
                maxAttempts: 2147483647
                backOffInitialInterval: 1000
                backOffMaxInterval: 300000
                backOffMultiplier: 2.0

Event Consumer 与我的初始实现相同。除了重新抛出容器知道处理失败的错误。如果您只是捕获它,那么容器就无法知道消息处理失败。通过执行 acknoweldgement.acknowledge 你只是控制偏移提交。为了重试发生,您必须抛出异常。不要忘记将 kafka 客户端自动提交 属性 和 spring(容器级别)autocommitOffset 属性 设置为 false。就是这样。

正如 Marius 所解释的,Kafka 只在日志中维护一个偏移量。如果处理下一条消息,并更新偏移量;失败的消息丢失。

您可以将失败的消息发送到死信主题(将 enableDlq 设置为 true)。

最新版本的 Spring Kafka (2.1.x) 有特殊的错误处理程序 ContainerStoppingErrorHandler 会在发生异常时停止容器, SeekToCurrentErrorHandler 会导致重新传递失败的消息。