Spring 来自 Kafka 主题的 Cloud Stream 反序列化无效 JSON
Spring Cloud Stream deserializing invalid JSON from Kafka Topic
我正在努力将 Spring Cloud Streams 与 Kafka 活页夹集成。目标是我的应用程序使用主题中的 json 并将其反序列化为 Java 对象。我正在使用函数式风格方法而不是命令式方法。我的代码使用结构良好的 json 输入。
另一方面,当我发送无效的 json 时,我希望触发错误记录方法。这在某些测试用例中有效,而在其他测试用例中无效。我的应用程序反序列化 json 即使它无效并触发包含逻辑的方法,而不是错误记录方法。
我无法解决框架反序列化某些非结构化 json 输入的问题。
@Builder
@Getter
@Setter
@AllArgsConstructor
@NoArgsConstructor
public class KafkaEventRecord {
@JsonProperty(value = "transport_metadata", required = true)
@NonNull
private TransportMetadata transportMetadata;
@JsonProperty(value = "payload", required = true)
@NonNull
private Payload payload;
}
@Component
public class TokenEventConsumer {
@Bean
Consumer<KafkaEventRecord> consumer() {
return event -> {
log.info("Kafka Event data consumed from Kafka {}", event);
};
}
}
@Configuration
@Slf4j
public class CloudStreamErrorHandler {
@ServiceActivator(inputChannel = "errorChannel")
public void handleError(ErrorMessage errorMessage) {
log.error("Error Message is {}", errorMessage);
}
}
@EmbeddedKafka(topics = {"batch-in"}, partitions = 3)
@TestPropertySource(properties = {"spring.kafka.producer.bootstrap-servers=${spring.embedded.kafka.brokers}",
"spring.kafka.consumer.bootstrap-servers=${spring.embedded.kafka.brokers}",
"spring.cloud.stream.kafka.binder.brokers=${spring.embedded.kafka.brokers}"})
@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)
@ActiveProfiles("test")
@Slf4j
public class KafkaTokenConsumerTest {
private static String TOPIC = "batch-in";
@Autowired
private EmbeddedKafkaBroker embeddedKafkaBroker;
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
@Autowired
private KafkaListenerEndpointRegistry endpointRegistry;
@Autowired
private ObjectMapper objectMapper;
@SpyBean
KafkaEventHandlerFactory kafkaEventHandlerFactory;
@SpyBean
CloudStreamErrorHandler cloudStreamErrorHandler;
@BeforeEach
void setUp() {
for (MessageListenerContainer messageListenerContainer : endpointRegistry.getListenerContainers()) {
ContainerTestUtils.waitForAssignment(messageListenerContainer, embeddedKafkaBroker.getPartitionsPerTopic());
}
}
// THIS METHOD PASSES
@Test
public void rejectCorruptedMessage() throws ExecutionException, InterruptedException {
kafkaTemplate.send(TOPIC, "{{{{").get(); // synchronous call
CountDownLatch latch = new CountDownLatch(1);
latch.await(5L, TimeUnit.SECONDS);
// The frame works tries two times, no idea why
verify(cloudStreamErrorHandler, times(2)).handleError(isA(ErrorMessage.class));
}
// THIS METHOD FAILS
@Test
public void rejectCorruptedMessage2() throws ExecutionException, InterruptedException {
kafkaTemplate.send(TOPIC, "{}}}").get(); // synchronous call
CountDownLatch latch = new CountDownLatch(1);
latch.await(5L, TimeUnit.SECONDS);
// The frame works tries two times, no idea why
verify(cloudStreamErrorHandler, times(2)).handleError(isA(ErrorMessage.class));
}
}
spring.cloud.stream.kafka.bindings.consumer-in-0.consumer.configuration.spring.deserializer.key.delegate.class=org.apache.kafka.common.serialization.StringDeserializer
spring.cloud.stream.kafka.bindings.consumer-in-0.consumer.configuration.spring.deserializer.value.delegate.class=org.springframework.kafka.support.serializer.JsonDeserializer
spring.cloud.stream.kafka.bindings.consumer-in-0.consumer.configuration.key.deserializer=org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
spring.cloud.stream.kafka.bindings.consumer-in-0.consumer.configuration.value.deserializer=org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
// Producer only for testing purpose
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer
json在rejectCorruptedMessage测试方法中,触发了handleError(ErrorMessage errorMessage)
方法,这是意料之中的,因为它是无效的json。另一方面,
json 在 rejectCorruptedMessage2 测试方法中,触发 TokenEventConsumer class 中的 Consumer<KafkaEventRecord> consumer()
方法,这不是预期的行为,但是,我得到了 KafkaEventRecord 对象具有空值。
Jackson 不认为这是无效的 JSON,它只是忽略尾随 }}
并将 {}
解码为空对象。
public class So67804599Application {
public static void main(String[] args) throws Exception {
ObjectMapper mapper = new ObjectMapper();
JavaType type = mapper.constructType(Foo.class);
Object foo = mapper.readerFor(Foo.class).readValue("{\"bar\":\"baz\"}");
System.out.println(foo);
foo = mapper.readerFor(Foo.class).readValue("{}}}");
System.out.println(foo);
}
public static class Foo {
String bar;
public String getBar() {
return this.bar;
}
public void setBar(String bar) {
this.bar = bar;
}
@Override
public String toString() {
return "Foo [bar=" + this.bar + "]";
}
}
}
Foo [bar=baz]
Foo [bar=null]
我正在努力将 Spring Cloud Streams 与 Kafka 活页夹集成。目标是我的应用程序使用主题中的 json 并将其反序列化为 Java 对象。我正在使用函数式风格方法而不是命令式方法。我的代码使用结构良好的 json 输入。
另一方面,当我发送无效的 json 时,我希望触发错误记录方法。这在某些测试用例中有效,而在其他测试用例中无效。我的应用程序反序列化 json 即使它无效并触发包含逻辑的方法,而不是错误记录方法。
我无法解决框架反序列化某些非结构化 json 输入的问题。
@Builder
@Getter
@Setter
@AllArgsConstructor
@NoArgsConstructor
public class KafkaEventRecord {
@JsonProperty(value = "transport_metadata", required = true)
@NonNull
private TransportMetadata transportMetadata;
@JsonProperty(value = "payload", required = true)
@NonNull
private Payload payload;
}
@Component
public class TokenEventConsumer {
@Bean
Consumer<KafkaEventRecord> consumer() {
return event -> {
log.info("Kafka Event data consumed from Kafka {}", event);
};
}
}
@Configuration
@Slf4j
public class CloudStreamErrorHandler {
@ServiceActivator(inputChannel = "errorChannel")
public void handleError(ErrorMessage errorMessage) {
log.error("Error Message is {}", errorMessage);
}
}
@EmbeddedKafka(topics = {"batch-in"}, partitions = 3)
@TestPropertySource(properties = {"spring.kafka.producer.bootstrap-servers=${spring.embedded.kafka.brokers}",
"spring.kafka.consumer.bootstrap-servers=${spring.embedded.kafka.brokers}",
"spring.cloud.stream.kafka.binder.brokers=${spring.embedded.kafka.brokers}"})
@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)
@ActiveProfiles("test")
@Slf4j
public class KafkaTokenConsumerTest {
private static String TOPIC = "batch-in";
@Autowired
private EmbeddedKafkaBroker embeddedKafkaBroker;
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
@Autowired
private KafkaListenerEndpointRegistry endpointRegistry;
@Autowired
private ObjectMapper objectMapper;
@SpyBean
KafkaEventHandlerFactory kafkaEventHandlerFactory;
@SpyBean
CloudStreamErrorHandler cloudStreamErrorHandler;
@BeforeEach
void setUp() {
for (MessageListenerContainer messageListenerContainer : endpointRegistry.getListenerContainers()) {
ContainerTestUtils.waitForAssignment(messageListenerContainer, embeddedKafkaBroker.getPartitionsPerTopic());
}
}
// THIS METHOD PASSES
@Test
public void rejectCorruptedMessage() throws ExecutionException, InterruptedException {
kafkaTemplate.send(TOPIC, "{{{{").get(); // synchronous call
CountDownLatch latch = new CountDownLatch(1);
latch.await(5L, TimeUnit.SECONDS);
// The frame works tries two times, no idea why
verify(cloudStreamErrorHandler, times(2)).handleError(isA(ErrorMessage.class));
}
// THIS METHOD FAILS
@Test
public void rejectCorruptedMessage2() throws ExecutionException, InterruptedException {
kafkaTemplate.send(TOPIC, "{}}}").get(); // synchronous call
CountDownLatch latch = new CountDownLatch(1);
latch.await(5L, TimeUnit.SECONDS);
// The frame works tries two times, no idea why
verify(cloudStreamErrorHandler, times(2)).handleError(isA(ErrorMessage.class));
}
}
spring.cloud.stream.kafka.bindings.consumer-in-0.consumer.configuration.spring.deserializer.key.delegate.class=org.apache.kafka.common.serialization.StringDeserializer
spring.cloud.stream.kafka.bindings.consumer-in-0.consumer.configuration.spring.deserializer.value.delegate.class=org.springframework.kafka.support.serializer.JsonDeserializer
spring.cloud.stream.kafka.bindings.consumer-in-0.consumer.configuration.key.deserializer=org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
spring.cloud.stream.kafka.bindings.consumer-in-0.consumer.configuration.value.deserializer=org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
// Producer only for testing purpose
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer
json在rejectCorruptedMessage测试方法中,触发了handleError(ErrorMessage errorMessage)
方法,这是意料之中的,因为它是无效的json。另一方面,
json 在 rejectCorruptedMessage2 测试方法中,触发 TokenEventConsumer class 中的 Consumer<KafkaEventRecord> consumer()
方法,这不是预期的行为,但是,我得到了 KafkaEventRecord 对象具有空值。
Jackson 不认为这是无效的 JSON,它只是忽略尾随 }}
并将 {}
解码为空对象。
public class So67804599Application {
public static void main(String[] args) throws Exception {
ObjectMapper mapper = new ObjectMapper();
JavaType type = mapper.constructType(Foo.class);
Object foo = mapper.readerFor(Foo.class).readValue("{\"bar\":\"baz\"}");
System.out.println(foo);
foo = mapper.readerFor(Foo.class).readValue("{}}}");
System.out.println(foo);
}
public static class Foo {
String bar;
public String getBar() {
return this.bar;
}
public void setBar(String bar) {
this.bar = bar;
}
@Override
public String toString() {
return "Foo [bar=" + this.bar + "]";
}
}
}
Foo [bar=baz]
Foo [bar=null]