Spring 流式传输 kafka Binder 测试自定义 Headers

Spring Stream kafka Binder Test Custom Headers

我正在尝试弄清楚如何在 Spring Message<?> 中使用 Kafka Binder 在 Spring Cloud Stream 中包含自定义 header。我的目客户 header 数据)。

我觉得我错过了一些东西,因为我似乎可以使用 TestChannelBinder 例如

让它工作
import org.springframework.messaging.Message;
import org.springframework.stereotype.Component;

import java.util.function.Function;

@Component
@Slf4j
public class BaseStream implements Function<Message<String>, String> {
    @Override
    public String apply(Message<String> transactionMessage) {
        log.debug("Converted Message: {} ", transactionMessage);
        return transactionMessage.getPayload();
    }

}

使用测试活页夹测试 class:


import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.cloud.stream.binder.test.InputDestination;
import org.springframework.cloud.stream.binder.test.OutputDestination;
import org.springframework.cloud.stream.binder.test.TestChannelBinderConfiguration;
import org.springframework.context.annotation.Import;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.kafka.test.context.EmbeddedKafka;
import org.springframework.test.context.TestPropertySource;


@SpringBootTest
@TestPropertySource("classpath:testStream.properties")
@Import(TestChannelBinderConfiguration.class)
public class TestForStream {


    @Autowired
    InputDestination inputDestination;
    @Autowired
    OutputDestination outputDestination;

    @Test
    void contextLoads() {
        inputDestination.send(MessageBuilder
                .withPayload("Test Payload")
                .setHeader("customHeader", "headerSpecificData")
                .build());
    }
}

testStream.properties

spring.cloud.function.definition=baseStream
spring.cloud.stream.bindings.baseStream-in-0.destination=test-in
spring.cloud.stream.bindings.baseStream-out-0.destination=test-out
spring.cloud.stream.bindings.baseStream-in-0.group=test-group-base

当 运行:

时记录
Converted Message: GenericMessage [payload=Test Payload, headers={id=5c6d1082-c084-0b25-4afc-b5d97bf537f9, customHeader=headerSpecificData, contentType=application/json, timestamp=1639398696800, target-protocol=kafka}]

这就是我想要做的。但是当我尝试为 kafka bider 测试它时,它似乎将 Message<String> object 作为 JSON 字符串包含在有效负载中,我认为它会被解析为请求的输入函数 BaseStream.

只是想知道是否有人可以看到我的测试出了什么问题,因为我已经尝试了各种方法来让它工作,并且看到它与测试活页夹一起工作时我会假设它适用于 Kafka活页夹.

Kafka Binder 测试Class:

import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.support.serializer.JsonSerializer;
import org.springframework.kafka.test.EmbeddedKafkaBroker;
import org.springframework.kafka.test.context.EmbeddedKafka;
import org.springframework.kafka.test.utils.KafkaTestUtils;
import org.springframework.test.context.TestPropertySource;

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;


@EmbeddedKafka(partitions = 1, brokerProperties = { "listeners=PLAINTEXT://localhost:9092", "port=9092"})
@SpringBootTest
@TestPropertySource("classpath:testStream.properties")
public class TestForStream {

    public static CountDownLatch latch = new CountDownLatch(1);
    @Autowired
    public EmbeddedKafkaBroker broker;

    @Test
    void contextLoads() {
        sleep(5);//Included this as it takes some time to init>

        sendMessage("test-in", MessageBuilder
                .withPayload("Test Payload")
                .setHeader("customHeader", "headerSpecificData")
                .build());


    }

    public <T> ProducerFactory<String, T> createProducerFactory() {
        Map<String, Object> configs = new HashMap<>(KafkaTestUtils.producerProps(broker));
        configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
        //Is JsonSerializer correct for a message?
        return new DefaultKafkaProducerFactory<>(configs);
    }

    public <T> void sendMessage(String topic, T listObj) {
        try {
            KafkaTemplate<String, T> kafkaTemplate = new KafkaTemplate<>(createProducerFactory());
            kafkaTemplate.send(new ProducerRecord<>(topic, listObj));
        }catch (Exception e){
            e.printStackTrace();
        }
    }

    public void sleep(long time){
        try {
            latch.await(time, TimeUnit.SECONDS);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

}


消息的kafka binder测试日志:

Converted Message: GenericMessage [payload={"payload":"Test Payload","headers":{"customHeader":"headerSpecificData","id":"d540a3ca-28db-b137-fc86-c25cc4b7eb8b","timestamp":1639399810476}}, headers={deliveryAttempt=1, kafka_timestampType=CREATE_TIME, kafka_receivedTopic=test-in, target-protocol=kafka, kafka_offset=0, scst_nativeHeadersPresent=true, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@79580279, id=1cf2d382-df29-2672-4180-07da99e58244, kafka_receivedPartitionId=0, kafka_receivedTimestamp=1639399810526, contentType=application/json, __TypeId__=[B@24c79350, kafka_groupId=test-group-base, timestamp=1639399810651}]

所以这里的消息已经包含在有效负载中,kafka headers 包含在 headers 中,正如预期的那样。

我已经尝试 spring.cloud.stream.kafka.binder.headersheaderMode 看看他们是否会改变任何东西但无济于事。

编辑:

使用springCloudVersion = 2020.0.3

我正在使用:

public <T> void sendMessage(String topic, T listObj) {
    try {
        KafkaTemplate<String, T> kafkaTemplate = new KafkaTemplate<>(createProducerFactory());
        kafkaTemplate.send(new ProducerRecord<>(topic, listObj));
    }catch (Exception e){
        e.printStackTrace();
    }
}

发送将消息作为值的消息。

我应该使用的:

public void sendMessage(String topic, Message<?> listObj) {
    try {
        KafkaTemplate<String, Message<?>> kafkaTemplate = new KafkaTemplate<>(createProducerFactory());
        kafkaTemplate.setDefaultTopic(topic);
        kafkaTemplate.send(listObj);
    }catch (Exception e){
        e.printStackTrace();
    }
}