相当于 Spring Cloud Stream Kafka Streams Binder 的 ChannelInterceptor

Equivalent to ChannelInterceptor for Spring Cloud Stream Kafka Streams Binder

我们在 Spring Boot 之上开发了一个内部公司框架,我们希望通过 Spring Cloud Stream 支持 Kafka-Streams。我们需要自动注入一些 headers 到所有出站消息。我们已经通过标准 Spring Cloud Stream Kafka Binder 注册自定义 ChannelInterceptor 实现了这一点,但这不适用于 Kafka Streams,因为它们似乎遵循不同的路径。

对于 Spring Cloud Stream Kafka Streams 活页夹,是否有任何等同于 ChannelInterceptor 的东西?

我找到这个 customizer/configurer:

 @Bean
  public StreamsBuilderFactoryBeanConfigurer streamsBuilderFactoryBeanConfigurer() {
    return factoryBean -> {

      factoryBean.setInfrastructureCustomizer(new KafkaStreamsInfrastructureCustomizer() {
        @Override
        public void configureBuilder(final StreamsBuilder builder) {
          
        }

        @Override
        public void configureTopology(final Topology topology) {
          
        }
      });

      factoryBean.setKafkaStreamsCustomizer(new KafkaStreamsCustomizer() {
        @Override
        public void customize(final KafkaStreams kafkaStreams) {
          
        }
      });

    };
  }

我最后的想法是使用configureTopology方法自动修改Topology并在最后一个sink节点之前插入一个Transformer,但是为了添加这个新节点我必须指定 parent 节点,所以我需要知道最后一个接收器节点的名称,也许所有节点名称都是由 Kafka Streams 自动生成的......唯一的方法是使用 topology.describe( ) 方法并可能解析字符串输出...与简单的 ChannelInterceptor.

相比,这听起来太复杂了

有什么想法吗?

您可以在流配置中添加 ProducerInterceptor

/**
 * A plugin interface that allows you to intercept (and possibly mutate) the records received by the producer before
 * they are published to the Kafka cluster.
 * <p>
 * This class will get producer config properties via <code>configure()</code> method, including clientId assigned
 * by KafkaProducer if not specified in the producer config. The interceptor implementation needs to be aware that it will be
 * sharing producer config namespace with other interceptors and serializers, and ensure that there are no conflicts.
 * <p>
 * Exceptions thrown by ProducerInterceptor methods will be caught, logged, but not propagated further. As a result, if
 * the user configures the interceptor with the wrong key and value type parameters, the producer will not throw an exception,
 * just log the errors.
 * <p>
 * ProducerInterceptor callbacks may be called from multiple threads. Interceptor implementation must ensure thread-safety, if needed.
 * <p>
 * Implement {@link org.apache.kafka.common.ClusterResourceListener} to receive cluster metadata once it's available. Please see the class documentation for ClusterResourceListener for more information.
 */
public interface ProducerInterceptor<K, V> extends Configurable {

您可以在那里修改记录的 headers。

(您也可以将此技术用于消息通道绑定器,而不是 ChannelInterceptor)。