相当于 Spring Cloud Stream Kafka Streams Binder 的 ChannelInterceptor
Equivalent to ChannelInterceptor for Spring Cloud Stream Kafka Streams Binder
我们在 Spring Boot 之上开发了一个内部公司框架,我们希望通过 Spring Cloud Stream 支持 Kafka-Streams。我们需要自动注入一些 headers 到所有出站消息。我们已经通过标准 Spring Cloud Stream Kafka Binder 注册自定义 ChannelInterceptor
实现了这一点,但这不适用于 Kafka Streams,因为它们似乎遵循不同的路径。
对于 Spring Cloud Stream Kafka Streams 活页夹,是否有任何等同于 ChannelInterceptor
的东西?
我找到这个 customizer/configurer:
@Bean
public StreamsBuilderFactoryBeanConfigurer streamsBuilderFactoryBeanConfigurer() {
return factoryBean -> {
factoryBean.setInfrastructureCustomizer(new KafkaStreamsInfrastructureCustomizer() {
@Override
public void configureBuilder(final StreamsBuilder builder) {
}
@Override
public void configureTopology(final Topology topology) {
}
});
factoryBean.setKafkaStreamsCustomizer(new KafkaStreamsCustomizer() {
@Override
public void customize(final KafkaStreams kafkaStreams) {
}
});
};
}
我最后的想法是使用configureTopology
方法自动修改Topology
并在最后一个sink节点之前插入一个Transformer
,但是为了添加这个新节点我必须指定 parent 节点,所以我需要知道最后一个接收器节点的名称,也许所有节点名称都是由 Kafka Streams 自动生成的......唯一的方法是使用 topology.describe( ) 方法并可能解析字符串输出...与简单的 ChannelInterceptor
.
相比,这听起来太复杂了
有什么想法吗?
您可以在流配置中添加 ProducerInterceptor
。
/**
* A plugin interface that allows you to intercept (and possibly mutate) the records received by the producer before
* they are published to the Kafka cluster.
* <p>
* This class will get producer config properties via <code>configure()</code> method, including clientId assigned
* by KafkaProducer if not specified in the producer config. The interceptor implementation needs to be aware that it will be
* sharing producer config namespace with other interceptors and serializers, and ensure that there are no conflicts.
* <p>
* Exceptions thrown by ProducerInterceptor methods will be caught, logged, but not propagated further. As a result, if
* the user configures the interceptor with the wrong key and value type parameters, the producer will not throw an exception,
* just log the errors.
* <p>
* ProducerInterceptor callbacks may be called from multiple threads. Interceptor implementation must ensure thread-safety, if needed.
* <p>
* Implement {@link org.apache.kafka.common.ClusterResourceListener} to receive cluster metadata once it's available. Please see the class documentation for ClusterResourceListener for more information.
*/
public interface ProducerInterceptor<K, V> extends Configurable {
您可以在那里修改记录的 headers。
(您也可以将此技术用于消息通道绑定器,而不是 ChannelInterceptor
)。
我们在 Spring Boot 之上开发了一个内部公司框架,我们希望通过 Spring Cloud Stream 支持 Kafka-Streams。我们需要自动注入一些 headers 到所有出站消息。我们已经通过标准 Spring Cloud Stream Kafka Binder 注册自定义 ChannelInterceptor
实现了这一点,但这不适用于 Kafka Streams,因为它们似乎遵循不同的路径。
对于 Spring Cloud Stream Kafka Streams 活页夹,是否有任何等同于 ChannelInterceptor
的东西?
我找到这个 customizer/configurer:
@Bean
public StreamsBuilderFactoryBeanConfigurer streamsBuilderFactoryBeanConfigurer() {
return factoryBean -> {
factoryBean.setInfrastructureCustomizer(new KafkaStreamsInfrastructureCustomizer() {
@Override
public void configureBuilder(final StreamsBuilder builder) {
}
@Override
public void configureTopology(final Topology topology) {
}
});
factoryBean.setKafkaStreamsCustomizer(new KafkaStreamsCustomizer() {
@Override
public void customize(final KafkaStreams kafkaStreams) {
}
});
};
}
我最后的想法是使用configureTopology
方法自动修改Topology
并在最后一个sink节点之前插入一个Transformer
,但是为了添加这个新节点我必须指定 parent 节点,所以我需要知道最后一个接收器节点的名称,也许所有节点名称都是由 Kafka Streams 自动生成的......唯一的方法是使用 topology.describe( ) 方法并可能解析字符串输出...与简单的 ChannelInterceptor
.
有什么想法吗?
您可以在流配置中添加 ProducerInterceptor
。
/**
* A plugin interface that allows you to intercept (and possibly mutate) the records received by the producer before
* they are published to the Kafka cluster.
* <p>
* This class will get producer config properties via <code>configure()</code> method, including clientId assigned
* by KafkaProducer if not specified in the producer config. The interceptor implementation needs to be aware that it will be
* sharing producer config namespace with other interceptors and serializers, and ensure that there are no conflicts.
* <p>
* Exceptions thrown by ProducerInterceptor methods will be caught, logged, but not propagated further. As a result, if
* the user configures the interceptor with the wrong key and value type parameters, the producer will not throw an exception,
* just log the errors.
* <p>
* ProducerInterceptor callbacks may be called from multiple threads. Interceptor implementation must ensure thread-safety, if needed.
* <p>
* Implement {@link org.apache.kafka.common.ClusterResourceListener} to receive cluster metadata once it's available. Please see the class documentation for ClusterResourceListener for more information.
*/
public interface ProducerInterceptor<K, V> extends Configurable {
您可以在那里修改记录的 headers。
(您也可以将此技术用于消息通道绑定器,而不是 ChannelInterceptor
)。