带有自定义 JSON 序列化程序的 Kafka Streams
Kafka Streams with custom JSON serializer
这个问题是 ,我在其中询问了使用自定义 Avro
Serdes 对 Kafka 流进行序列化的问题。现在我在尝试配置 JSON
Serde 时遇到了不同的问题。我有这个 kafka 流拓扑,我使用 groupByKey(Grouped.with(CustomSerdes.String(), CustomSerdes.Notification()))
.
@Service
@Slf4j
@EnableBinding(PosListenerAvroJsonBinding.class)
public class NotificationAvroJsonProcessorService {
@Autowired
RecordBuilder recordBuilder;
@StreamListener("notification-input-avro-channel")
@SendTo("notification-output-json-channel")
public KStream<String, Notification> process(KStream<String, PosInvoiceAvro> input) {
/* with reduce transformation and serialization with KTable */
KStream<String, Notification> notificationJsonKStream = input
.filter((k, v) -> v.getCustomerType().equalsIgnoreCase(PRIME))
.map((k, v) -> new KeyValue<>(v.getCustomerCardNo(), recordBuilder.getNotificationJson(v)))
// ***********************************************
// THIS DOES NOT WORK WITH JSON, only works with AVRO.
.groupByKey(Grouped.with(CustomSerdes.String(), CustomSerdes.Notification()))
// ***********************************************
.reduce((aggValue, newValue) -> {
newValue.setTotalLoyaltyPoints(newValue.getEarnedLoyaltyPoints() + aggValue.getTotalLoyaltyPoints());
return newValue;
})
.toStream();
notificationJsonKStream.foreach((k, v) -> log.info(String.format("Notification JSON agg - key: %s, value: %s", k, v)));
return notificationJsonKStream;
}
}
我根据 this web page 定义了自定义序列化程序。我想我必须使用 com.fasterxml.jackson.databind.JsonNode
,但它没有用。我还测试了评论的其他选项,但它们也没有用。
public class CustomSerdes extends Serdes {
private final static Map<String, String> serdeConfig = Stream.of(
new AbstractMap.SimpleEntry<>(SCHEMA_REGISTRY_URL_CONFIG, "http://localhost:8081")
, new AbstractMap.SimpleEntry<>(JSON_VALUE_TYPE, "com.fasterxml.jackson.databind.JsonNode")
// , new AbstractMap.SimpleEntry<>(JSON_VALUE_TYPE, "com.github.felipegutierrez.explore.spring.model.Notification")
// , new AbstractMap.SimpleEntry<>(JSON_VALUE_TYPE, "com.fasterxml.jackson.databind.JavaType")
, new AbstractMap.SimpleEntry<>(TYPE_PROPERTY, TYPE_PROPERTY_DEFAULT)
// , new AbstractMap.SimpleEntry<>("json.value.type", "org.springframework.kafka.support.serializer.JsonSerializer")
)
public static Serde<Notification> Notification() {
final Serde<Notification> notificationSerde = new KafkaJsonSchemaSerde<Notification>();
notificationSerde.configure(serdeConfig, false);
return notificationSerde;
}
网页上也说要定义javaclass的type.property=javaType, the JSON schema could specify "javaType":"org.acme.MyRecord" at the top level
。
@lombok.Data
@lombok.AllArgsConstructor
@lombok.NoArgsConstructor
@JsonInclude(JsonInclude.Include.NON_NULL)
@JsonSchemaInject(strings = {@JsonSchemaString(path = "javaType", value = "com.github.felipegutierrez.explore.spring.model.Notification")})
public class Notification {
@JsonProperty("InvoiceNumber")
private String InvoiceNumber;
@JsonProperty("CustomerCardNo")
private String CustomerCardNo;
@JsonProperty("TotalAmount")
private Double TotalAmount;
@JsonProperty("EarnedLoyaltyPoints")
private Double EarnedLoyaltyPoints;
@JsonProperty("TotalLoyaltyPoints")
private Double TotalLoyaltyPoints = 0.0;
}
当我为 Spring + Kafka 使用默认的 JSON 序列化程序时,我只是在 application.yaml
上设置了 spring.json.add.type.headers: false
并且它起作用了。但是我在 Confluent 序列化器上找不到这样的 属性。
最后报错如下。我认为要走的路是在我的 notificationSerde.configure(serdeConfig, false);
中使用正确的参数,因为当我在那里更改 json 序列化程序时,我看到应用程序试图转换为不同的 classes。但是我不知道我必须在那里放置哪个配置。
Caused by: java.lang.ClassCastException: class
com.fasterxml.jackson.databind.node.ObjectNode cannot be cast to class
com.github.felipegutierrez.explore.spring.model.Notification
(com.fasterxml.jackson.databind.node.ObjectNode and
com.github.felipegutierrez.explore.spring.model.Notification are in
unnamed module of loader 'app')
我修复了使用 Serde 的这个配置。
@Service
public class CustomSerdes extends Serdes {
private final static Map<String, String> serdeConfig = Stream.of(
new AbstractMap.SimpleEntry<>(SCHEMA_REGISTRY_URL_CONFIG, "http://localhost:8081")
, new AbstractMap.SimpleEntry<>(FAIL_INVALID_SCHEMA, "true")
, new AbstractMap.SimpleEntry<>(JSON_VALUE_TYPE, Notification.class.getName()))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
public static Serde<Notification> Notification() {
final Serde<Notification> notificationSerde = new KafkaJsonSchemaSerde<>();
notificationSerde.configure(serdeConfig, false);
return notificationSerde;
}
}
并将 Materialized.with(CustomSerdes.String(), CustomSerdes.Notification())
添加到减速器以及 。
KStream<String, Notification> notificationJsonKStream = input
.filter((k, v) -> v.getCustomerType().equalsIgnoreCase(PRIME))
.map((k, v) -> new KeyValue<>(v.getCustomerCardNo(), recordBuilder.getNotificationJson(v)))
.groupByKey(Grouped.with(CustomSerdes.String(), CustomSerdes.Notification()))
.reduce((aggValue, newValue) -> {
newValue.setTotalLoyaltyPoints(newValue.getEarnedLoyaltyPoints() + aggValue.getTotalLoyaltyPoints());
return newValue;
},
Named.as("notification-reducer"),
Materialized.with(CustomSerdes.String(), CustomSerdes.Notification()))
.toStream();
notificationJsonKStream.foreach((k, v) -> log.info(String.format("Notification JSON agg - key: %s, value: %s", k, v)));
这个问题是 Avro
Serdes 对 Kafka 流进行序列化的问题。现在我在尝试配置 JSON
Serde 时遇到了不同的问题。我有这个 kafka 流拓扑,我使用 groupByKey(Grouped.with(CustomSerdes.String(), CustomSerdes.Notification()))
.
@Service
@Slf4j
@EnableBinding(PosListenerAvroJsonBinding.class)
public class NotificationAvroJsonProcessorService {
@Autowired
RecordBuilder recordBuilder;
@StreamListener("notification-input-avro-channel")
@SendTo("notification-output-json-channel")
public KStream<String, Notification> process(KStream<String, PosInvoiceAvro> input) {
/* with reduce transformation and serialization with KTable */
KStream<String, Notification> notificationJsonKStream = input
.filter((k, v) -> v.getCustomerType().equalsIgnoreCase(PRIME))
.map((k, v) -> new KeyValue<>(v.getCustomerCardNo(), recordBuilder.getNotificationJson(v)))
// ***********************************************
// THIS DOES NOT WORK WITH JSON, only works with AVRO.
.groupByKey(Grouped.with(CustomSerdes.String(), CustomSerdes.Notification()))
// ***********************************************
.reduce((aggValue, newValue) -> {
newValue.setTotalLoyaltyPoints(newValue.getEarnedLoyaltyPoints() + aggValue.getTotalLoyaltyPoints());
return newValue;
})
.toStream();
notificationJsonKStream.foreach((k, v) -> log.info(String.format("Notification JSON agg - key: %s, value: %s", k, v)));
return notificationJsonKStream;
}
}
我根据 this web page 定义了自定义序列化程序。我想我必须使用 com.fasterxml.jackson.databind.JsonNode
,但它没有用。我还测试了评论的其他选项,但它们也没有用。
public class CustomSerdes extends Serdes {
private final static Map<String, String> serdeConfig = Stream.of(
new AbstractMap.SimpleEntry<>(SCHEMA_REGISTRY_URL_CONFIG, "http://localhost:8081")
, new AbstractMap.SimpleEntry<>(JSON_VALUE_TYPE, "com.fasterxml.jackson.databind.JsonNode")
// , new AbstractMap.SimpleEntry<>(JSON_VALUE_TYPE, "com.github.felipegutierrez.explore.spring.model.Notification")
// , new AbstractMap.SimpleEntry<>(JSON_VALUE_TYPE, "com.fasterxml.jackson.databind.JavaType")
, new AbstractMap.SimpleEntry<>(TYPE_PROPERTY, TYPE_PROPERTY_DEFAULT)
// , new AbstractMap.SimpleEntry<>("json.value.type", "org.springframework.kafka.support.serializer.JsonSerializer")
)
public static Serde<Notification> Notification() {
final Serde<Notification> notificationSerde = new KafkaJsonSchemaSerde<Notification>();
notificationSerde.configure(serdeConfig, false);
return notificationSerde;
}
网页上也说要定义javaclass的type.property=javaType, the JSON schema could specify "javaType":"org.acme.MyRecord" at the top level
。
@lombok.Data
@lombok.AllArgsConstructor
@lombok.NoArgsConstructor
@JsonInclude(JsonInclude.Include.NON_NULL)
@JsonSchemaInject(strings = {@JsonSchemaString(path = "javaType", value = "com.github.felipegutierrez.explore.spring.model.Notification")})
public class Notification {
@JsonProperty("InvoiceNumber")
private String InvoiceNumber;
@JsonProperty("CustomerCardNo")
private String CustomerCardNo;
@JsonProperty("TotalAmount")
private Double TotalAmount;
@JsonProperty("EarnedLoyaltyPoints")
private Double EarnedLoyaltyPoints;
@JsonProperty("TotalLoyaltyPoints")
private Double TotalLoyaltyPoints = 0.0;
}
当我为 Spring + Kafka 使用默认的 JSON 序列化程序时,我只是在 application.yaml
上设置了 spring.json.add.type.headers: false
并且它起作用了。但是我在 Confluent 序列化器上找不到这样的 属性。
最后报错如下。我认为要走的路是在我的 notificationSerde.configure(serdeConfig, false);
中使用正确的参数,因为当我在那里更改 json 序列化程序时,我看到应用程序试图转换为不同的 classes。但是我不知道我必须在那里放置哪个配置。
Caused by: java.lang.ClassCastException: class com.fasterxml.jackson.databind.node.ObjectNode cannot be cast to class com.github.felipegutierrez.explore.spring.model.Notification (com.fasterxml.jackson.databind.node.ObjectNode and com.github.felipegutierrez.explore.spring.model.Notification are in unnamed module of loader 'app')
我修复了使用 Serde 的这个配置。
@Service
public class CustomSerdes extends Serdes {
private final static Map<String, String> serdeConfig = Stream.of(
new AbstractMap.SimpleEntry<>(SCHEMA_REGISTRY_URL_CONFIG, "http://localhost:8081")
, new AbstractMap.SimpleEntry<>(FAIL_INVALID_SCHEMA, "true")
, new AbstractMap.SimpleEntry<>(JSON_VALUE_TYPE, Notification.class.getName()))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
public static Serde<Notification> Notification() {
final Serde<Notification> notificationSerde = new KafkaJsonSchemaSerde<>();
notificationSerde.configure(serdeConfig, false);
return notificationSerde;
}
}
并将 Materialized.with(CustomSerdes.String(), CustomSerdes.Notification())
添加到减速器以及
KStream<String, Notification> notificationJsonKStream = input
.filter((k, v) -> v.getCustomerType().equalsIgnoreCase(PRIME))
.map((k, v) -> new KeyValue<>(v.getCustomerCardNo(), recordBuilder.getNotificationJson(v)))
.groupByKey(Grouped.with(CustomSerdes.String(), CustomSerdes.Notification()))
.reduce((aggValue, newValue) -> {
newValue.setTotalLoyaltyPoints(newValue.getEarnedLoyaltyPoints() + aggValue.getTotalLoyaltyPoints());
return newValue;
},
Named.as("notification-reducer"),
Materialized.with(CustomSerdes.String(), CustomSerdes.Notification()))
.toStream();
notificationJsonKStream.foreach((k, v) -> log.info(String.format("Notification JSON agg - key: %s, value: %s", k, v)));