阻止 __TypeId__ 在 Spring Cloud Stream 中使用

Prevent __TypeId__ to be used in Spring Cloud Stream

我们有一个流氓生产者将 Kafka Header __TypeId__ 设置为 class 这是生产者的一部分,但不是 中实施的消费者Spring Cloud Stream 应用程序使用 Kafka Streams 活页夹。结果出现异常 java.lang.IllegalArgumentException: The class 'com.bad.MyClass' is not in the trusted packages: [java.util, java.lang, de.datev.pws.loon.dcp.foreignmodels.*]. If you believe this class is safe to deserialize, please provide its name. If the serialization is only done by a trusted source, you can also enable trust all (*).

我们如何确保在消费者内部忽略此 TypeId header?

一些 Whosebug 答案指向 spring.json.use.type.headers=false,但它似乎是一个“旧”属性,不再有效。

application.yaml:

spring:
  json.use.type.headers: false
  application:
    name: dcp-all
  kafka:
    bootstrap-servers: 'xxxxx.kafka.dev.dvint.de:9093'
  cloud:
    stream:
      kafka:
        streams:
          binder:
            required-acks: -1 # all in-sync-replicas

...

堆栈跟踪:

    at org.springframework.kafka.support.mapping.DefaultJackson2JavaTypeMapper.getClassIdType(DefaultJackson2JavaTypeMapper.java:129)
    at org.springframework.kafka.support.mapping.DefaultJackson2JavaTypeMapper.toJavaType(DefaultJackson2JavaTypeMapper.java:103)
    at org.springframework.kafka.support.serializer.JsonDeserializer.deserialize(JsonDeserializer.java:569)
    at org.apache.kafka.streams.processor.internals.SourceNode.deserializeValue(SourceNode.java:58)
    at org.apache.kafka.streams.processor.internals.RecordDeserializer.deserialize(RecordDeserializer.java:66)
    at org.apache.kafka.streams.processor.internals.RecordQueue.updateHead(RecordQueue.java:176)
    at org.apache.kafka.streams.processor.internals.RecordQueue.addRawRecords(RecordQueue.java:112)
    at org.apache.kafka.streams.processor.internals.PartitionGroup.addRawRecords(PartitionGroup.java:304)
    at org.apache.kafka.streams.processor.internals.StreamTask.addRecords(StreamTask.java:960)
    at org.apache.kafka.streams.processor.internals.TaskManager.addRecordsToTasks(TaskManager.java:1068)
    at org.apache.kafka.streams.processor.internals.StreamThread.pollPhase(StreamThread.java:962)
    at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:751)
    at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:604)
    at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:576)

这是一个单元测试

    @Test
    void consumeWorksEvenWithBadTypesHeader() throws JsonProcessingException, InterruptedException {
        Map<String, Object> producerProps = KafkaTestUtils.producerProps(embeddedKafka);
        producerProps.put("key.serializer", StringSerializer.class.getName());
        DefaultKafkaProducerFactory<String, String> pf = new DefaultKafkaProducerFactory<>(producerProps);

        List<Header> headers = Arrays.asList(new RecordHeader("__TypeId__", "com.bad.MyClass".getBytes()));
        ProducerRecord<String,String> p = new ProducerRecord(TOPIC1, 0, "any-key",
            "{ ... some valid JSON ...}", headers);

        try {
            KafkaTemplate<String, String> template = new KafkaTemplate<>(pf, true);
            template.send(p);

            ConsumerRecord<String, String> consumerRecord = KafkaTestUtils.getSingleRecord(consumer, TOPIC2, DEFAULT_CONSUMER_POLL_TIME);

            // Assertions ...
        } finally {
            pf.destroy();
        }
    }

您有 2 个选择:

  1. 在生产者端设置 属性 以省略添加类型信息 headers
  2. 在消费者端,将 属性 设置为不使用类型信息 headers

https://docs.spring.io/spring-kafka/docs/current/reference/html/#json-serde

这不是“老”属性。

    /**
     * Kafka config property for using type headers (default true).
     * @since 2.2.3
     */
    public static final String USE_TYPE_INFO_HEADERS = "spring.json.use.type.headers";

需要在消费者属性中设置。