使用 Spring-Kafka 和 Confluent 模式注册表将带有 JSON 模式的记录发送到 Kafka
Send Record with JSON Schema to Kafka using Spring-Kafka and Confluent schema registry
我在互联网上找不到任何关于如何使用 spring kafka 将带有 json 架构的记录发送到 kafka 的信息。我该怎么做?
花了几个小时后,我发现有 3 种不同的方式可以发送带有 json 模式的记录。相关部分在io.confluent.kafka.schemaregistry.json.JsonSchemaUtils
中实现
这里是摘录:
if (isEnvelope(object)) {
return getSchemaFromEnvelope((JsonNode) object);
}
Class<?> cls = object.getClass();
if (cls.isAnnotationPresent(Schema.class)) {
Schema schema = (Schema) cls.getAnnotation(Schema.class);
List<SchemaReference> references = Arrays.stream(schema.refs())
.map(ref -> new SchemaReference(ref.name(), ref.subject(), ref.version()))
.collect(Collectors.toList());
if (client == null) {
if (!references.isEmpty()) {
throw new IllegalArgumentException("Cannot resolve schema " + schema.value()
+ " with refs " + references);
}
return new JsonSchema(schema.value());
} else {
return (JsonSchema) client.parseSchema(JsonSchema.TYPE, schema.value(), references)
.orElseThrow(() -> new IOException("Invalid schema " + schema.value()
+ " with refs " + references));
}
}
JsonSchemaConfig config = getConfig(useOneofForNullables, failUnknownProperties);
JsonSchemaDraft draft;
switch (specVersion) {
case DRAFT_4:
draft = JsonSchemaDraft.DRAFT_04;
break;
case DRAFT_6:
draft = JsonSchemaDraft.DRAFT_06;
break;
case DRAFT_7:
draft = JsonSchemaDraft.DRAFT_07;
break;
case DRAFT_2019_09:
draft = JsonSchemaDraft.DRAFT_2019_09;
break;
default:
draft = JsonSchemaDraft.DRAFT_07;
break;
}
config = config.withJsonSchemaDraft(draft);
JsonSchemaGenerator jsonSchemaGenerator = new JsonSchemaGenerator(objectMapper, config);
JsonNode jsonSchema = jsonSchemaGenerator.generateJsonSchema(cls);
return new JsonSchema(jsonSchema);
所以你有 3 种可能性:
- 创建一个包含架构和有效负载字段的 JsonNode
- 用@Schema
注释你的Class
- 不提供架构,由架构生成器生成
我使用以下代码选择了 1):
public class MyKafkaTemplate {
private static final String SCHEMA_POSTFIX_KEY = "-key.json";
private static final String SCHEMA_POSTFIX_VALUE = "-value.json";
private final KafkaTemplate<JsonNode, JsonNode> kafkaTemplate;
private final ObjectMapper objectMapper;
private final Map<String, JsonNode> topicSchemaCache = new ConcurrentHashMap<>();
public <K, V> void send(final String topic, final K key, final V value) {
final JsonNode keyNode = getEnvelope(topic + SCHEMA_POSTFIX_KEY, key);
final JsonNode valueNode = getEnvelope(topic + SCHEMA_POSTFIX_VALUE, value);
kafkaTemplate.send(topic, keyNode, valueNode);
}
private JsonNode getEnvelope(final String schemaFilePath, final Object key) {
final JsonNode schemaNode = getOrLoadSchema(schemaFilePath);
final JsonNode payload = objectMapper.valueToTree(key);
return JsonSchemaUtils.envelope(schemaNode, payload);
}
private JsonNode getOrLoadSchema(final String schemaFilePath) {
return topicSchemaCache.computeIfAbsent(schemaFilePath, key ->
readFileToJsonNode(schemaFilePath));
}
@SneakyThrows
private JsonNode readFileToJsonNode(final String schemaFilePath) {
return objectMapper.readTree(new ClassPathResource(schemaFilePath).getFile());
}
}
我在互联网上找不到任何关于如何使用 spring kafka 将带有 json 架构的记录发送到 kafka 的信息。我该怎么做?
花了几个小时后,我发现有 3 种不同的方式可以发送带有 json 模式的记录。相关部分在io.confluent.kafka.schemaregistry.json.JsonSchemaUtils
中实现这里是摘录:
if (isEnvelope(object)) {
return getSchemaFromEnvelope((JsonNode) object);
}
Class<?> cls = object.getClass();
if (cls.isAnnotationPresent(Schema.class)) {
Schema schema = (Schema) cls.getAnnotation(Schema.class);
List<SchemaReference> references = Arrays.stream(schema.refs())
.map(ref -> new SchemaReference(ref.name(), ref.subject(), ref.version()))
.collect(Collectors.toList());
if (client == null) {
if (!references.isEmpty()) {
throw new IllegalArgumentException("Cannot resolve schema " + schema.value()
+ " with refs " + references);
}
return new JsonSchema(schema.value());
} else {
return (JsonSchema) client.parseSchema(JsonSchema.TYPE, schema.value(), references)
.orElseThrow(() -> new IOException("Invalid schema " + schema.value()
+ " with refs " + references));
}
}
JsonSchemaConfig config = getConfig(useOneofForNullables, failUnknownProperties);
JsonSchemaDraft draft;
switch (specVersion) {
case DRAFT_4:
draft = JsonSchemaDraft.DRAFT_04;
break;
case DRAFT_6:
draft = JsonSchemaDraft.DRAFT_06;
break;
case DRAFT_7:
draft = JsonSchemaDraft.DRAFT_07;
break;
case DRAFT_2019_09:
draft = JsonSchemaDraft.DRAFT_2019_09;
break;
default:
draft = JsonSchemaDraft.DRAFT_07;
break;
}
config = config.withJsonSchemaDraft(draft);
JsonSchemaGenerator jsonSchemaGenerator = new JsonSchemaGenerator(objectMapper, config);
JsonNode jsonSchema = jsonSchemaGenerator.generateJsonSchema(cls);
return new JsonSchema(jsonSchema);
所以你有 3 种可能性:
- 创建一个包含架构和有效负载字段的 JsonNode
- 用@Schema 注释你的Class
- 不提供架构,由架构生成器生成
我使用以下代码选择了 1):
public class MyKafkaTemplate {
private static final String SCHEMA_POSTFIX_KEY = "-key.json";
private static final String SCHEMA_POSTFIX_VALUE = "-value.json";
private final KafkaTemplate<JsonNode, JsonNode> kafkaTemplate;
private final ObjectMapper objectMapper;
private final Map<String, JsonNode> topicSchemaCache = new ConcurrentHashMap<>();
public <K, V> void send(final String topic, final K key, final V value) {
final JsonNode keyNode = getEnvelope(topic + SCHEMA_POSTFIX_KEY, key);
final JsonNode valueNode = getEnvelope(topic + SCHEMA_POSTFIX_VALUE, value);
kafkaTemplate.send(topic, keyNode, valueNode);
}
private JsonNode getEnvelope(final String schemaFilePath, final Object key) {
final JsonNode schemaNode = getOrLoadSchema(schemaFilePath);
final JsonNode payload = objectMapper.valueToTree(key);
return JsonSchemaUtils.envelope(schemaNode, payload);
}
private JsonNode getOrLoadSchema(final String schemaFilePath) {
return topicSchemaCache.computeIfAbsent(schemaFilePath, key ->
readFileToJsonNode(schemaFilePath));
}
@SneakyThrows
private JsonNode readFileToJsonNode(final String schemaFilePath) {
return objectMapper.readTree(new ClassPathResource(schemaFilePath).getFile());
}
}