测试使用 SpecificAvroSerde 的 Kafka 处理器 API
Testing Kafka Processor API that uses SpecificAvroSerde
我正在尝试为自定义流处理器编写单元测试,但无法序列化我需要发送以进行测试的消息。我按照 kafka 的这个例子: https://kafka.apache.org/11/documentation/streams/developer-guide/testing.html 。我在我的流中使用 SpecificAvroSerde 作为自定义 class(自动生成的 avro class),但我无法在测试中使用 MockSchemaRegistryClient() 配置它,我只能指向 URL SR。
Serde<MyCustomObject> valueSerde = new SpecificAvroSerde<>();
Map<String, String> valueSerdeConfig = new HashMap<>();
valueSerdeConfig.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, "fake");
valueSerdeConfig.put(AbstractKafkaAvroSerDeConfig.AUTO_REGISTER_SCHEMAS, "true");
valueSerde.configure(valueSerdeConfig, false);
ConsumerRecordFactory<Long, MyCustomObject> recordFactory = new ConsumerRecordFactory<>(new LongSerializer(), valueSerde.serializer());
使用 KafkaAvroSerializer 我可以像这样初始化它:
KafkaAvroSerializer serializer = new KafkaAvroSerializer(schemaRegistryClient);
但 ConsumerRecordFactory 不会将 KafkaAvroSerializer 作为参数。
是否有任何替代方法或我不知道的方法?
非常感谢任何帮助,谢谢。
我正在为所有 Avro 定义生成 Java 个模型:
#!/usr/bin/env bash
if [ ! -f avro-tools-1.8.2.jar ]; then
wget http://tux.rainside.sk/apache/avro/avro-1.8.2/java/avro-tools-1.8.2.jar
chmod +x avro-tools-1.8.2.jar
fi
java -jar avro-tools-1.8.2.jar compile schema ../avro/raw/* ../../java/
然后我只生成一些消息到模拟的 Kafka 集群中
public abstract class ViewPageEventGenerator {
@NotNull
public static KeyValue<List, HashMap<String, String>> getSimpleViewPages() {
List<KeyValue<CustomerKey, ViewPage>> inputValues = new ArrayList<>();
HashMap<String, String> requestExpectedValuePairs = new HashMap<>();
inputValues = Arrays.asList(
new KeyValue<>(
new CustomerKey(1912, "Alan Turing"),
new ViewPage("Alan Turing", false,
Double.parseDouble(String.valueOf(System.currentTimeMillis())),
"https://alan.turing/", "192.168.0.1", "Turing Machine", "Punch card"
)
),
new KeyValue<>(
new CustomerKey(1912, "Alan Turing"),
new ViewPage("Alan Turing", false,
Double.parseDouble(String.valueOf(System.currentTimeMillis() + 100)),
"https://alan.turing/", "192.168.0.1", "Turing Machine", "Punch card"
)
),
new KeyValue<>(
new CustomerKey(1912, "Alan Turing"),
new ViewPage("Alan Turing", false,
Double.parseDouble(String.valueOf(System.currentTimeMillis() + 200)),
"https://alan.turing/", "192.168.0.1", "Turing Machine", "Punch card"
)
)
);
requestExpectedValuePairs.put(
"{project_id: 1912}",
"{\"success\":true,\"data\":{\"count\":3}}"
);
return new KeyValue<>(inputValues, requestExpectedValuePairs);
}
}
就是这样。在拓扑中,我使用基于 Avro 定义生成的 Java 模型 (类)。
感谢您提出的解决方案,但我今天确实找到了一个。 Serdes 由序列化器和反序列化器组成:https://kafka.apache.org/11/javadoc/org/apache/kafka/common/serialization/Serdes.html#serdeFrom-org.apache.kafka.common.serialization.Serializer-org.apache.kafka.common.serialization.Deserializer- 所以我从 KafkaAvroSerializer 和 KafkaAvroDeserializer 构建了我的 Serde,如下所示:
Serde serde = Serdes.serdeFrom(new KafkaAvroSerializer(client), new KafkaAvroDeserializer(client));
每个实现 Serializer< T > 的 class 都可以是 Serde 的一部分。
我正在尝试为自定义流处理器编写单元测试,但无法序列化我需要发送以进行测试的消息。我按照 kafka 的这个例子: https://kafka.apache.org/11/documentation/streams/developer-guide/testing.html 。我在我的流中使用 SpecificAvroSerde 作为自定义 class(自动生成的 avro class),但我无法在测试中使用 MockSchemaRegistryClient() 配置它,我只能指向 URL SR。
Serde<MyCustomObject> valueSerde = new SpecificAvroSerde<>();
Map<String, String> valueSerdeConfig = new HashMap<>();
valueSerdeConfig.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, "fake");
valueSerdeConfig.put(AbstractKafkaAvroSerDeConfig.AUTO_REGISTER_SCHEMAS, "true");
valueSerde.configure(valueSerdeConfig, false);
ConsumerRecordFactory<Long, MyCustomObject> recordFactory = new ConsumerRecordFactory<>(new LongSerializer(), valueSerde.serializer());
使用 KafkaAvroSerializer 我可以像这样初始化它:
KafkaAvroSerializer serializer = new KafkaAvroSerializer(schemaRegistryClient);
但 ConsumerRecordFactory 不会将 KafkaAvroSerializer 作为参数。
是否有任何替代方法或我不知道的方法?
非常感谢任何帮助,谢谢。
我正在为所有 Avro 定义生成 Java 个模型:
#!/usr/bin/env bash
if [ ! -f avro-tools-1.8.2.jar ]; then
wget http://tux.rainside.sk/apache/avro/avro-1.8.2/java/avro-tools-1.8.2.jar
chmod +x avro-tools-1.8.2.jar
fi
java -jar avro-tools-1.8.2.jar compile schema ../avro/raw/* ../../java/
然后我只生成一些消息到模拟的 Kafka 集群中
public abstract class ViewPageEventGenerator {
@NotNull
public static KeyValue<List, HashMap<String, String>> getSimpleViewPages() {
List<KeyValue<CustomerKey, ViewPage>> inputValues = new ArrayList<>();
HashMap<String, String> requestExpectedValuePairs = new HashMap<>();
inputValues = Arrays.asList(
new KeyValue<>(
new CustomerKey(1912, "Alan Turing"),
new ViewPage("Alan Turing", false,
Double.parseDouble(String.valueOf(System.currentTimeMillis())),
"https://alan.turing/", "192.168.0.1", "Turing Machine", "Punch card"
)
),
new KeyValue<>(
new CustomerKey(1912, "Alan Turing"),
new ViewPage("Alan Turing", false,
Double.parseDouble(String.valueOf(System.currentTimeMillis() + 100)),
"https://alan.turing/", "192.168.0.1", "Turing Machine", "Punch card"
)
),
new KeyValue<>(
new CustomerKey(1912, "Alan Turing"),
new ViewPage("Alan Turing", false,
Double.parseDouble(String.valueOf(System.currentTimeMillis() + 200)),
"https://alan.turing/", "192.168.0.1", "Turing Machine", "Punch card"
)
)
);
requestExpectedValuePairs.put(
"{project_id: 1912}",
"{\"success\":true,\"data\":{\"count\":3}}"
);
return new KeyValue<>(inputValues, requestExpectedValuePairs);
}
}
就是这样。在拓扑中,我使用基于 Avro 定义生成的 Java 模型 (类)。
感谢您提出的解决方案,但我今天确实找到了一个。 Serdes 由序列化器和反序列化器组成:https://kafka.apache.org/11/javadoc/org/apache/kafka/common/serialization/Serdes.html#serdeFrom-org.apache.kafka.common.serialization.Serializer-org.apache.kafka.common.serialization.Deserializer- 所以我从 KafkaAvroSerializer 和 KafkaAvroDeserializer 构建了我的 Serde,如下所示:
Serde serde = Serdes.serdeFrom(new KafkaAvroSerializer(client), new KafkaAvroDeserializer(client));
每个实现 Serializer< T > 的 class 都可以是 Serde 的一部分。