Spring 流式传输 kafka Binder 测试自定义 Headers
Spring Stream kafka Binder Test Custom Headers
我正在尝试弄清楚如何在 Spring Message<?>
中使用 Kafka Binder 在 Spring Cloud Stream 中包含自定义 header。我的目客户 header 数据)。
我觉得我错过了一些东西,因为我似乎可以使用 TestChannelBinder
例如
让它工作
import org.springframework.messaging.Message;
import org.springframework.stereotype.Component;
import java.util.function.Function;
@Component
@Slf4j
public class BaseStream implements Function<Message<String>, String> {
@Override
public String apply(Message<String> transactionMessage) {
log.debug("Converted Message: {} ", transactionMessage);
return transactionMessage.getPayload();
}
}
使用测试活页夹测试 class:
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.cloud.stream.binder.test.InputDestination;
import org.springframework.cloud.stream.binder.test.OutputDestination;
import org.springframework.cloud.stream.binder.test.TestChannelBinderConfiguration;
import org.springframework.context.annotation.Import;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.kafka.test.context.EmbeddedKafka;
import org.springframework.test.context.TestPropertySource;
@SpringBootTest
@TestPropertySource("classpath:testStream.properties")
@Import(TestChannelBinderConfiguration.class)
public class TestForStream {
@Autowired
InputDestination inputDestination;
@Autowired
OutputDestination outputDestination;
@Test
void contextLoads() {
inputDestination.send(MessageBuilder
.withPayload("Test Payload")
.setHeader("customHeader", "headerSpecificData")
.build());
}
}
testStream.properties
spring.cloud.function.definition=baseStream
spring.cloud.stream.bindings.baseStream-in-0.destination=test-in
spring.cloud.stream.bindings.baseStream-out-0.destination=test-out
spring.cloud.stream.bindings.baseStream-in-0.group=test-group-base
当 运行:
时记录
Converted Message: GenericMessage [payload=Test Payload, headers={id=5c6d1082-c084-0b25-4afc-b5d97bf537f9, customHeader=headerSpecificData, contentType=application/json, timestamp=1639398696800, target-protocol=kafka}]
这就是我想要做的。但是当我尝试为 kafka bider 测试它时,它似乎将 Message<String>
object 作为 JSON 字符串包含在有效负载中,我认为它会被解析为请求的输入函数 BaseStream
.
只是想知道是否有人可以看到我的测试出了什么问题,因为我已经尝试了各种方法来让它工作,并且看到它与测试活页夹一起工作时我会假设它适用于 Kafka活页夹.
Kafka Binder 测试Class:
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.support.serializer.JsonSerializer;
import org.springframework.kafka.test.EmbeddedKafkaBroker;
import org.springframework.kafka.test.context.EmbeddedKafka;
import org.springframework.kafka.test.utils.KafkaTestUtils;
import org.springframework.test.context.TestPropertySource;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
@EmbeddedKafka(partitions = 1, brokerProperties = { "listeners=PLAINTEXT://localhost:9092", "port=9092"})
@SpringBootTest
@TestPropertySource("classpath:testStream.properties")
public class TestForStream {
public static CountDownLatch latch = new CountDownLatch(1);
@Autowired
public EmbeddedKafkaBroker broker;
@Test
void contextLoads() {
sleep(5);//Included this as it takes some time to init>
sendMessage("test-in", MessageBuilder
.withPayload("Test Payload")
.setHeader("customHeader", "headerSpecificData")
.build());
}
public <T> ProducerFactory<String, T> createProducerFactory() {
Map<String, Object> configs = new HashMap<>(KafkaTestUtils.producerProps(broker));
configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
//Is JsonSerializer correct for a message?
return new DefaultKafkaProducerFactory<>(configs);
}
public <T> void sendMessage(String topic, T listObj) {
try {
KafkaTemplate<String, T> kafkaTemplate = new KafkaTemplate<>(createProducerFactory());
kafkaTemplate.send(new ProducerRecord<>(topic, listObj));
}catch (Exception e){
e.printStackTrace();
}
}
public void sleep(long time){
try {
latch.await(time, TimeUnit.SECONDS);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
消息的kafka binder测试日志:
Converted Message: GenericMessage [payload={"payload":"Test Payload","headers":{"customHeader":"headerSpecificData","id":"d540a3ca-28db-b137-fc86-c25cc4b7eb8b","timestamp":1639399810476}}, headers={deliveryAttempt=1, kafka_timestampType=CREATE_TIME, kafka_receivedTopic=test-in, target-protocol=kafka, kafka_offset=0, scst_nativeHeadersPresent=true, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@79580279, id=1cf2d382-df29-2672-4180-07da99e58244, kafka_receivedPartitionId=0, kafka_receivedTimestamp=1639399810526, contentType=application/json, __TypeId__=[B@24c79350, kafka_groupId=test-group-base, timestamp=1639399810651}]
所以这里的消息已经包含在有效负载中,kafka headers 包含在 headers 中,正如预期的那样。
我已经尝试 spring.cloud.stream.kafka.binder.headers
和 headerMode
看看他们是否会改变任何东西但无济于事。
编辑:
使用springCloudVersion = 2020.0.3
我正在使用:
public <T> void sendMessage(String topic, T listObj) {
try {
KafkaTemplate<String, T> kafkaTemplate = new KafkaTemplate<>(createProducerFactory());
kafkaTemplate.send(new ProducerRecord<>(topic, listObj));
}catch (Exception e){
e.printStackTrace();
}
}
发送将消息作为值的消息。
我应该使用的:
public void sendMessage(String topic, Message<?> listObj) {
try {
KafkaTemplate<String, Message<?>> kafkaTemplate = new KafkaTemplate<>(createProducerFactory());
kafkaTemplate.setDefaultTopic(topic);
kafkaTemplate.send(listObj);
}catch (Exception e){
e.printStackTrace();
}
}
我正在尝试弄清楚如何在 Spring Message<?>
中使用 Kafka Binder 在 Spring Cloud Stream 中包含自定义 header。我的目客户 header 数据)。
我觉得我错过了一些东西,因为我似乎可以使用 TestChannelBinder
例如
import org.springframework.messaging.Message;
import org.springframework.stereotype.Component;
import java.util.function.Function;
@Component
@Slf4j
public class BaseStream implements Function<Message<String>, String> {
@Override
public String apply(Message<String> transactionMessage) {
log.debug("Converted Message: {} ", transactionMessage);
return transactionMessage.getPayload();
}
}
使用测试活页夹测试 class:
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.cloud.stream.binder.test.InputDestination;
import org.springframework.cloud.stream.binder.test.OutputDestination;
import org.springframework.cloud.stream.binder.test.TestChannelBinderConfiguration;
import org.springframework.context.annotation.Import;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.kafka.test.context.EmbeddedKafka;
import org.springframework.test.context.TestPropertySource;
@SpringBootTest
@TestPropertySource("classpath:testStream.properties")
@Import(TestChannelBinderConfiguration.class)
public class TestForStream {
@Autowired
InputDestination inputDestination;
@Autowired
OutputDestination outputDestination;
@Test
void contextLoads() {
inputDestination.send(MessageBuilder
.withPayload("Test Payload")
.setHeader("customHeader", "headerSpecificData")
.build());
}
}
testStream.properties
spring.cloud.function.definition=baseStream
spring.cloud.stream.bindings.baseStream-in-0.destination=test-in
spring.cloud.stream.bindings.baseStream-out-0.destination=test-out
spring.cloud.stream.bindings.baseStream-in-0.group=test-group-base
当 运行:
时记录Converted Message: GenericMessage [payload=Test Payload, headers={id=5c6d1082-c084-0b25-4afc-b5d97bf537f9, customHeader=headerSpecificData, contentType=application/json, timestamp=1639398696800, target-protocol=kafka}]
这就是我想要做的。但是当我尝试为 kafka bider 测试它时,它似乎将 Message<String>
object 作为 JSON 字符串包含在有效负载中,我认为它会被解析为请求的输入函数 BaseStream
.
只是想知道是否有人可以看到我的测试出了什么问题,因为我已经尝试了各种方法来让它工作,并且看到它与测试活页夹一起工作时我会假设它适用于 Kafka活页夹.
Kafka Binder 测试Class:
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.support.serializer.JsonSerializer;
import org.springframework.kafka.test.EmbeddedKafkaBroker;
import org.springframework.kafka.test.context.EmbeddedKafka;
import org.springframework.kafka.test.utils.KafkaTestUtils;
import org.springframework.test.context.TestPropertySource;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
@EmbeddedKafka(partitions = 1, brokerProperties = { "listeners=PLAINTEXT://localhost:9092", "port=9092"})
@SpringBootTest
@TestPropertySource("classpath:testStream.properties")
public class TestForStream {
public static CountDownLatch latch = new CountDownLatch(1);
@Autowired
public EmbeddedKafkaBroker broker;
@Test
void contextLoads() {
sleep(5);//Included this as it takes some time to init>
sendMessage("test-in", MessageBuilder
.withPayload("Test Payload")
.setHeader("customHeader", "headerSpecificData")
.build());
}
public <T> ProducerFactory<String, T> createProducerFactory() {
Map<String, Object> configs = new HashMap<>(KafkaTestUtils.producerProps(broker));
configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
//Is JsonSerializer correct for a message?
return new DefaultKafkaProducerFactory<>(configs);
}
public <T> void sendMessage(String topic, T listObj) {
try {
KafkaTemplate<String, T> kafkaTemplate = new KafkaTemplate<>(createProducerFactory());
kafkaTemplate.send(new ProducerRecord<>(topic, listObj));
}catch (Exception e){
e.printStackTrace();
}
}
public void sleep(long time){
try {
latch.await(time, TimeUnit.SECONDS);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
消息的kafka binder测试日志:
Converted Message: GenericMessage [payload={"payload":"Test Payload","headers":{"customHeader":"headerSpecificData","id":"d540a3ca-28db-b137-fc86-c25cc4b7eb8b","timestamp":1639399810476}}, headers={deliveryAttempt=1, kafka_timestampType=CREATE_TIME, kafka_receivedTopic=test-in, target-protocol=kafka, kafka_offset=0, scst_nativeHeadersPresent=true, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@79580279, id=1cf2d382-df29-2672-4180-07da99e58244, kafka_receivedPartitionId=0, kafka_receivedTimestamp=1639399810526, contentType=application/json, __TypeId__=[B@24c79350, kafka_groupId=test-group-base, timestamp=1639399810651}]
所以这里的消息已经包含在有效负载中,kafka headers 包含在 headers 中,正如预期的那样。
我已经尝试 spring.cloud.stream.kafka.binder.headers
和 headerMode
看看他们是否会改变任何东西但无济于事。
编辑:
使用springCloudVersion = 2020.0.3
我正在使用:
public <T> void sendMessage(String topic, T listObj) {
try {
KafkaTemplate<String, T> kafkaTemplate = new KafkaTemplate<>(createProducerFactory());
kafkaTemplate.send(new ProducerRecord<>(topic, listObj));
}catch (Exception e){
e.printStackTrace();
}
}
发送将消息作为值的消息。
我应该使用的:
public void sendMessage(String topic, Message<?> listObj) {
try {
KafkaTemplate<String, Message<?>> kafkaTemplate = new KafkaTemplate<>(createProducerFactory());
kafkaTemplate.setDefaultTopic(topic);
kafkaTemplate.send(listObj);
}catch (Exception e){
e.printStackTrace();
}
}