使用 Spring-Kafka 和 Confluent 模式注册表将带有 JSON 模式的记录发送到 Kafka

Send Record with JSON Schema to Kafka using Spring-Kafka and Confluent schema registry

我在互联网上找不到任何关于如何使用 spring kafka 将带有 json 架构的记录发送到 kafka 的信息。我该怎么做?

花了几个小时后,我发现有 3 种不同的方式可以发送带有 json 模式的记录。相关部分在io.confluent.kafka.schemaregistry.json.JsonSchemaUtils

中实现

这里是摘录:

if (isEnvelope(object)) {
  return getSchemaFromEnvelope((JsonNode) object);
}
Class<?> cls = object.getClass();
if (cls.isAnnotationPresent(Schema.class)) {
  Schema schema = (Schema) cls.getAnnotation(Schema.class);
  List<SchemaReference> references = Arrays.stream(schema.refs())
          .map(ref -> new SchemaReference(ref.name(), ref.subject(), ref.version()))
          .collect(Collectors.toList());
  if (client == null) {
    if (!references.isEmpty()) {
      throw new IllegalArgumentException("Cannot resolve schema " + schema.value()
              + " with refs " + references);
    }
    return new JsonSchema(schema.value());
  } else {
    return (JsonSchema) client.parseSchema(JsonSchema.TYPE, schema.value(), references)
            .orElseThrow(() -> new IOException("Invalid schema " + schema.value()
                    + " with refs " + references));
  }
}
JsonSchemaConfig config = getConfig(useOneofForNullables, failUnknownProperties);
JsonSchemaDraft draft;
switch (specVersion) {
  case DRAFT_4:
    draft = JsonSchemaDraft.DRAFT_04;
    break;
  case DRAFT_6:
    draft = JsonSchemaDraft.DRAFT_06;
    break;
  case DRAFT_7:
    draft = JsonSchemaDraft.DRAFT_07;
    break;
  case DRAFT_2019_09:
    draft = JsonSchemaDraft.DRAFT_2019_09;
    break;
  default:
    draft = JsonSchemaDraft.DRAFT_07;
    break;
}
config = config.withJsonSchemaDraft(draft);
JsonSchemaGenerator jsonSchemaGenerator = new JsonSchemaGenerator(objectMapper, config);
JsonNode jsonSchema = jsonSchemaGenerator.generateJsonSchema(cls);
return new JsonSchema(jsonSchema);

所以你有 3 种可能性:

  1. 创建一个包含架构和有效负载字段的 JsonNode
  2. 用@Schema
  3. 注释你的Class
  4. 不提供架构,由架构生成器生成

我使用以下代码选择了 1):

public class MyKafkaTemplate {

   private static final String SCHEMA_POSTFIX_KEY = "-key.json";
   private static final String SCHEMA_POSTFIX_VALUE = "-value.json";
   private final KafkaTemplate<JsonNode, JsonNode> kafkaTemplate;
   private final ObjectMapper objectMapper;
   private final Map<String, JsonNode> topicSchemaCache = new ConcurrentHashMap<>();

   public <K, V> void send(final String topic, final K key, final V value) {
       final JsonNode keyNode = getEnvelope(topic + SCHEMA_POSTFIX_KEY, key);
       final JsonNode valueNode = getEnvelope(topic + SCHEMA_POSTFIX_VALUE, value);
       kafkaTemplate.send(topic, keyNode, valueNode);
   }

   private JsonNode getEnvelope(final String schemaFilePath, final Object key) {
       final JsonNode schemaNode = getOrLoadSchema(schemaFilePath);
       final JsonNode payload = objectMapper.valueToTree(key);
       return JsonSchemaUtils.envelope(schemaNode, payload);
   }

   private JsonNode getOrLoadSchema(final String schemaFilePath) {
       return topicSchemaCache.computeIfAbsent(schemaFilePath, key -> 
        readFileToJsonNode(schemaFilePath));
   }

   @SneakyThrows
   private JsonNode readFileToJsonNode(final String schemaFilePath) {
       return objectMapper.readTree(new ClassPathResource(schemaFilePath).getFile());
   }

}