序列化 Avro 消息时出错
Error serializing Avro message
我在我的项目中使用带 Spring Boot 的 Kafka Streams。在我的用例中,我通过使用 SpecificAvroSerializer
序列化并使用 KStream API 来发送一个 Order 对象。当我使用 KafkaProducer 发送对象时,出现以下异常
nested exception is org.apache.kafka.common.errors.SerializationException: Error serializing Avro message] with root cause. java.lang.NullPointerException: null
基于Confluent examples. Not sure where I made a mistake. I would really appreciate any help. Code uploaded to Github开发本项目,供参考。
异常:
2018-04-17 16:19:39.170 ERROR 6161 --- [nio-8090-exec-1] o.a.c.c.C.[.[.[/].[dispatcherServlet] : Servlet.service() for servlet [dispatcherServlet] in context with path [] threw exception [Request processing failed; nested exception is org.apache.kafka.common.errors.SerializationException: Error serializing Avro message] with root cause
java.lang.NullPointerException: null
at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.registerAndGetId(CachedSchemaRegistryClient.java:57) ~[kafka-schema-registry-client-3.0.0.jar:na]
at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.register(CachedSchemaRegistryClient.java:89) ~[kafka-schema-registry-client-3.0.0.jar:na]
at io.confluent.kafka.serializers.AbstractKafkaAvroSerializer.serializeImpl(AbstractKafkaAvroSerializer.java:72) ~[kafka-avro-serializer-3.0.0.jar:na]
at io.confluent.kafka.serializers.KafkaAvroSerializer.serialize(KafkaAvroSerializer.java:54) ~[kafka-avro-serializer-3.0.0.jar:na]
at com.kafkastream.stream.SpecificAvroSerializer.serialize(SpecificAvroSerializer.java:47) ~[classes/:na]
at com.kafkastream.stream.SpecificAvroSerializer.serialize(SpecificAvroSerializer.java:12) ~[classes/:na]
at org.apache.kafka.common.serialization.ExtendedSerializer$Wrapper.serialize(ExtendedSerializer.java:65) ~[kafka-clients-1.0.1.jar:na]
at org.apache.kafka.common.serialization.ExtendedSerializer$Wrapper.serialize(ExtendedSerializer.java:55) ~[kafka-clients-1.0.1.jar:na]
at org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:791) ~[kafka-clients-1.0.1.jar:na]
at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:768) ~[kafka-clients-1.0.1.jar:na]
at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:656) ~[kafka-clients-1.0.1.jar:na]
at com.kafkastream.service.EventsSender.sendCustomerEvent(EventsSender.java:57) ~[classes/:na]
at com.kafkastream.web.EventsController.sendCustomers(EventsController.java:53) ~[classes/:na]
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:1.8.0_144]
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[na:1.8.0_144]
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:1.8.0_144]
at java.lang.reflect.Method.invoke(Method.java:498) ~[na:1.8.0_144]
at org.springframework.web.method.support.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:209) ~[spring-web-5.0.5.RELEASE.jar:5.0.5.RELEASE]
at org.springframework.web.method.support.InvocableHandlerMethod.invokeForRequest(InvocableHandlerMethod.java:136) ~[spring-web-5.0.5.RELEASE.jar:5.0.5.RELEASE]
at org.springframework.web.servlet.mvc.method.annotation.ServletInvocableHandlerMethod.invokeAndHandle(ServletInvocableHandlerMethod.java:102) ~[spring-webmvc-5.0.5.RELEASE.jar:5.0.5.RELEASE]
at org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.invokeHandlerMethod(RequestMappingHandlerAdapter.java:877) ~[spring-webmvc-5.0.5.RELEASE.jar:5.0.5.RELEASE]
at org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.handleInternal(RequestMappingHandlerAdapter.java:783) ~[spring-webmvc-5.0.5.RELEASE.jar:5.0.5.RELEASE]
at org.springframework.web.servlet.mvc.method.AbstractHandlerMethodAdapter.handle(AbstractHandlerMethodAdapter.java:87) ~[spring-webmvc-5.0.5.RELEASE.jar:5.0.5.RELEASE]
at org.springframework.web.servlet.DispatcherServlet.doDispatch(DispatcherServlet.java:991) ~[spring-webmvc-5.0.5.RELEASE.jar:5.0.5.RELEASE]
at org.springframework.web.servlet.DispatcherServlet.doService(DispatcherServlet.java:925) ~[spring-webmvc-5.0.5.RELEASE.jar:5.0.5.RELEASE]
at org.springframework.web.servlet.FrameworkServlet.processRequest(FrameworkServlet.java:974) ~[spring-webmvc-5.0.5.RELEASE.jar:5.0.5.RELEASE]
at org.springframework.web.servlet.FrameworkServlet.doGet(FrameworkServlet.java:866) ~[spring-webmvc-5.0.5.RELEASE.jar:5.0.5.RELEASE]
at javax.servlet.http.HttpServlet.service(HttpServlet.java:635) ~[tomcat-embed-core-8.5.29.jar:8.5.29]
at org.springframework.web.servlet.FrameworkServlet.service(FrameworkServlet.java:851) ~[spring-webmvc-5.0.5.RELEASE.jar:5.0.5.RELEASE]
at javax.servlet.http.HttpServlet.service(HttpServlet.java:742) ~[tomcat-embed-core-8.5.29.jar:8.5.29]
at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:231) ~[tomcat-embed-core-8.5.29.jar:8.5.29]
at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166) ~[tomcat-embed-core-8.5.29.jar:8.5.29]
at org.apache.tomcat.websocket.server.WsFilter.doFilter(WsFilter.java:52) ~[tomcat-embed-websocket-8.5.29.jar:8.5.29]
at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193) ~[tomcat-embed-core-8.5.29.jar:8.5.29]
at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166) ~[tomcat-embed-core-8.5.29.jar:8.5.29]
at org.springframework.web.filter.RequestContextFilter.doFilterInternal(RequestContextFilter.java:99) ~[spring-web-5.0.5.RELEASE.jar:5.0.5.RELEASE]
at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:107) ~[spring-web-5.0.5.RELEASE.jar:5.0.5.RELEASE]
at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193) ~[tomcat-embed-core-8.5.29.jar:8.5.29]
at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166) ~[tomcat-embed-core-8.5.29.jar:8.5.29]
at org.springframework.web.filter.HttpPutFormContentFilter.doFilterInternal(HttpPutFormContentFilter.java:109) ~[spring-web-5.0.5.RELEASE.jar:5.0.5.RELEASE]
at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:107) ~[spring-web-5.0.5.RELEASE.jar:5.0.5.RELEASE]
at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193) ~[tomcat-embed-core-8.5.29.jar:8.5.29]
at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166) ~[tomcat-embed-core-8.5.29.jar:8.5.29]
at org.springframework.web.filter.HiddenHttpMethodFilter.doFilterInternal(HiddenHttpMethodFilter.java:81) ~[spring-web-5.0.5.RELEASE.jar:5.0.5.RELEASE]
at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:107) ~[spring-web-5.0.5.RELEASE.jar:5.0.5.RELEASE]
at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193) ~[tomcat-embed-core-8.5.29.jar:8.5.29]
at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166) ~[tomcat-embed-core-8.5.29.jar:8.5.29]
at org.springframework.web.filter.CharacterEncodingFilter.doFilterInternal(CharacterEncodingFilter.java:200) ~[spring-web-5.0.5.RELEASE.jar:5.0.5.RELEASE]
at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:107) ~[spring-web-5.0.5.RELEASE.jar:5.0.5.RELEASE]
at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193) ~[tomcat-embed-core-8.5.29.jar:8.5.29]
at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166) ~[tomcat-embed-core-8.5.29.jar:8.5.29]
at org.apache.catalina.core.StandardWrapperValve.invoke(StandardWrapperValve.java:198) ~[tomcat-embed-core-8.5.29.jar:8.5.29]
at org.apache.catalina.core.StandardContextValve.invoke(StandardContextValve.java:96) [tomcat-embed-core-8.5.29.jar:8.5.29]
at org.apache.catalina.authenticator.AuthenticatorBase.invoke(AuthenticatorBase.java:496) [tomcat-embed-core-8.5.29.jar:8.5.29]
at org.apache.catalina.core.StandardHostValve.invoke(StandardHostValve.java:140) [tomcat-embed-core-8.5.29.jar:8.5.29]
at org.apache.catalina.valves.ErrorReportValve.invoke(ErrorReportValve.java:81) [tomcat-embed-core-8.5.29.jar:8.5.29]
at org.apache.catalina.core.StandardEngineValve.invoke(StandardEngineValve.java:87) [tomcat-embed-core-8.5.29.jar:8.5.29]
at org.apache.catalina.connector.CoyoteAdapter.service(CoyoteAdapter.java:342) [tomcat-embed-core-8.5.29.jar:8.5.29]
at org.apache.coyote.http11.Http11Processor.service(Http11Processor.java:803) [tomcat-embed-core-8.5.29.jar:8.5.29]
at org.apache.coyote.AbstractProcessorLight.process(AbstractProcessorLight.java:66) [tomcat-embed-core-8.5.29.jar:8.5.29]
at org.apache.coyote.AbstractProtocol$ConnectionHandler.process(AbstractProtocol.java:790) [tomcat-embed-core-8.5.29.jar:8.5.29]
at org.apache.tomcat.util.net.NioEndpoint$SocketProcessor.doRun(NioEndpoint.java:1459) [tomcat-embed-core-8.5.29.jar:8.5.29]
at org.apache.tomcat.util.net.SocketProcessorBase.run(SocketProcessorBase.java:49) [tomcat-embed-core-8.5.29.jar:8.5.29]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [na:1.8.0_144]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [na:1.8.0_144]
at org.apache.tomcat.util.threads.TaskThread$WrappingRunnable.run(TaskThread.java:61) [tomcat-embed-core-8.5.29.jar:8.5.29]
at java.lang.Thread.run(Thread.java:748) [na:1.8.0_144]
终于,我能够解决这个问题了。我在使用 Avro serde 时犯了一个基本错误。模型 classes 由 avro 工具基于 Customer.avsc 或 Order.avsc 文件自动生成。我正在手动创建这些 classes 并用它们实现特定记录 class。实现后,覆盖方法即 getSchema()、get、put 没有任何代码。当编译器执行 getSchema()
语句时,它返回 null
并抛出 Null Pointer Exception
。
package com.kafkastream.model;
import org.apache.avro.Schema;
import org.apache.avro.specific.SpecificRecord;
import java.util.Objects;
public class Customer implements SpecificRecord
{
public String customerId;
public String firstName;
public String lastName;
public String email;
public String phone;
public Customer()
{
}
public Customer(String customerId, String firstName, String lastName, String email, String phone)
{
this.customerId = customerId;
this.firstName = firstName;
this.lastName = lastName;
this.email = email;
this.phone = phone;
}
// Getters and Setters here
@Override
public void put(int i, Object o)
{
}
@Override
public Object get(int i)
{
return null;
}
@Override
public Schema getSchema()
{
return null;
}
}
经过一些研究,我对项目做了一些改动。
- 为项目添加了
avro-tools
依赖
- 已将 avro 生成器插件添加到 pom.xml 文件
插件:
<plugin>
<groupId>org.apache.avro</groupId>
<artifactId>avro-maven-plugin</artifactId>
<version>${avro.version}</version>
<executions>
<execution>
<phase>generate-sources</phase>
<goals>
<goal>schema</goal>
<goal>protocol</goal>
<goal>idl-protocol</goal>
</goals>
<configuration>
<sourceDirectory>${project.basedir}/src/main/resources/avro</sourceDirectory>
<outputDirectory>${project.basedir}/src/main/java/</outputDirectory>
</configuration>
</execution>
</executions>
</plugin>
- 在 ../resources/avro 文件夹
下创建了 Customer.avsc、Order.avsc 和 Greetings.avsc 文件
- 在此之后编译项目
mvn clean package --DskipTests
或 Right click on project Maven --> Generate sources
编译项目通过 avro 工具自动在模型文件夹中创建模型 classes(Customer class,如下所示)
Customer.java(由 avro 生成)
/**
* Autogenerated by Avro
*
* DO NOT EDIT DIRECTLY
*/
package com.kafkastream.model;
import org.apache.avro.specific.SpecificData;
import org.apache.avro.message.BinaryMessageEncoder;
import org.apache.avro.message.BinaryMessageDecoder;
import org.apache.avro.message.SchemaStore;
@SuppressWarnings("all")
@org.apache.avro.specific.AvroGenerated
public class Customer extends org.apache.avro.specific.SpecificRecordBase implements org.apache.avro.specific.SpecificRecord {
private static final long serialVersionUID = 2729048783015827572L;
public static final org.apache.avro.Schema SCHEMA$ = new org.apache.avro.Schema.Parser().parse("{\"type\":\"record\",\"name\":\"Customer\",\"namespace\":\"com.kafkastream.model\",\"fields\":[{\"name\":\"customerId\",\"type\":\"string\"},{\"name\":\"firstName\",\"type\":\"string\"},{\"name\":\"lastName\",\"type\":\"string\"},{\"name\":\"email\",\"type\":\"string\"},{\"name\":\"phone\",\"type\":\"string\"}]}");
public static org.apache.avro.Schema getClassSchema() { return SCHEMA$; }
private static SpecificData MODEL$ = new SpecificData();
private static final BinaryMessageEncoder<Customer> ENCODER =
new BinaryMessageEncoder<Customer>(MODEL$, SCHEMA$);
private static final BinaryMessageDecoder<Customer> DECODER =
new BinaryMessageDecoder<Customer>(MODEL$, SCHEMA$);
/**
* Return the BinaryMessageDecoder instance used by this class.
*/
public static BinaryMessageDecoder<Customer> getDecoder() {
return DECODER;
}
/**
* Create a new BinaryMessageDecoder instance for this class that uses the specified {@link SchemaStore}.
* @param resolver a {@link SchemaStore} used to find schemas by fingerprint
*/
public static BinaryMessageDecoder<Customer> createDecoder(SchemaStore resolver) {
return new BinaryMessageDecoder<Customer>(MODEL$, SCHEMA$, resolver);
}
/** Serializes this Customer to a ByteBuffer. */
public java.nio.ByteBuffer toByteBuffer() throws java.io.IOException {
return ENCODER.encode(this);
}
/** Deserializes a Customer from a ByteBuffer. */
public static Customer fromByteBuffer(
java.nio.ByteBuffer b) throws java.io.IOException {
return DECODER.decode(b);
}
@Deprecated public java.lang.CharSequence customerId;
@Deprecated public java.lang.CharSequence firstName;
@Deprecated public java.lang.CharSequence lastName;
@Deprecated public java.lang.CharSequence email;
@Deprecated public java.lang.CharSequence phone;
/**
* Default constructor. Note that this does not initialize fields
* to their default values from the schema. If that is desired then
* one should use <code>newBuilder()</code>.
*/
public Customer() {}
/**
* All-args constructor.
* @param customerId The new value for customerId
* @param firstName The new value for firstName
* @param lastName The new value for lastName
* @param email The new value for email
* @param phone The new value for phone
*/
public Customer(java.lang.CharSequence customerId, java.lang.CharSequence firstName, java.lang.CharSequence lastName, java.lang.CharSequence email, java.lang.CharSequence phone) {
this.customerId = customerId;
this.firstName = firstName;
this.lastName = lastName;
this.email = email;
this.phone = phone;
}
public org.apache.avro.Schema getSchema() { return SCHEMA$; }
// Used by DatumWriter. Applications should not call.
public java.lang.Object get(int field$) {
switch (field$) {
case 0: return customerId;
case 1: return firstName;
case 2: return lastName;
case 3: return email;
case 4: return phone;
default: throw new org.apache.avro.AvroRuntimeException("Bad index");
}
}
// Used by DatumReader. Applications should not call.
@SuppressWarnings(value="unchecked")
public void put(int field$, java.lang.Object value$) {
switch (field$) {
case 0: customerId = (java.lang.CharSequence)value$; break;
case 1: firstName = (java.lang.CharSequence)value$; break;
case 2: lastName = (java.lang.CharSequence)value$; break;
case 3: email = (java.lang.CharSequence)value$; break;
case 4: phone = (java.lang.CharSequence)value$; break;
default: throw new org.apache.avro.AvroRuntimeException("Bad index");
}
}
/**
* Gets the value of the 'customerId' field.
* @return The value of the 'customerId' field.
*/
public java.lang.CharSequence getCustomerId() {
return customerId;
}
/**
* Sets the value of the 'customerId' field.
* @param value the value to set.
*/
public void setCustomerId(java.lang.CharSequence value) {
this.customerId = value;
}
/**
* Gets the value of the 'firstName' field.
* @return The value of the 'firstName' field.
*/
public java.lang.CharSequence getFirstName() {
return firstName;
}
/**
* Sets the value of the 'firstName' field.
* @param value the value to set.
*/
public void setFirstName(java.lang.CharSequence value) {
this.firstName = value;
}
/**
* Gets the value of the 'lastName' field.
* @return The value of the 'lastName' field.
*/
public java.lang.CharSequence getLastName() {
return lastName;
}
/**
* Sets the value of the 'lastName' field.
* @param value the value to set.
*/
public void setLastName(java.lang.CharSequence value) {
this.lastName = value;
}
/**
* Gets the value of the 'email' field.
* @return The value of the 'email' field.
*/
public java.lang.CharSequence getEmail() {
return email;
}
/**
* Sets the value of the 'email' field.
* @param value the value to set.
*/
public void setEmail(java.lang.CharSequence value) {
this.email = value;
}
/**
* Gets the value of the 'phone' field.
* @return The value of the 'phone' field.
*/
public java.lang.CharSequence getPhone() {
return phone;
}
/**
* Sets the value of the 'phone' field.
* @param value the value to set.
*/
public void setPhone(java.lang.CharSequence value) {
this.phone = value;
}
/**
* Creates a new Customer RecordBuilder.
* @return A new Customer RecordBuilder
*/
public static com.kafkastream.model.Customer.Builder newBuilder() {
return new com.kafkastream.model.Customer.Builder();
}
/**
* Creates a new Customer RecordBuilder by copying an existing Builder.
* @param other The existing builder to copy.
* @return A new Customer RecordBuilder
*/
public static com.kafkastream.model.Customer.Builder newBuilder(com.kafkastream.model.Customer.Builder other) {
return new com.kafkastream.model.Customer.Builder(other);
}
/**
* Creates a new Customer RecordBuilder by copying an existing Customer instance.
* @param other The existing instance to copy.
* @return A new Customer RecordBuilder
*/
public static com.kafkastream.model.Customer.Builder newBuilder(com.kafkastream.model.Customer other) {
return new com.kafkastream.model.Customer.Builder(other);
}
/**
* RecordBuilder for Customer instances.
*/
public static class Builder extends org.apache.avro.specific.SpecificRecordBuilderBase<Customer>
implements org.apache.avro.data.RecordBuilder<Customer> {
private java.lang.CharSequence customerId;
private java.lang.CharSequence firstName;
private java.lang.CharSequence lastName;
private java.lang.CharSequence email;
private java.lang.CharSequence phone;
/** Creates a new Builder */
private Builder() {
super(SCHEMA$);
}
/**
* Creates a Builder by copying an existing Builder.
* @param other The existing Builder to copy.
*/
private Builder(com.kafkastream.model.Customer.Builder other) {
super(other);
if (isValidValue(fields()[0], other.customerId)) {
this.customerId = data().deepCopy(fields()[0].schema(), other.customerId);
fieldSetFlags()[0] = true;
}
if (isValidValue(fields()[1], other.firstName)) {
this.firstName = data().deepCopy(fields()[1].schema(), other.firstName);
fieldSetFlags()[1] = true;
}
if (isValidValue(fields()[2], other.lastName)) {
this.lastName = data().deepCopy(fields()[2].schema(), other.lastName);
fieldSetFlags()[2] = true;
}
if (isValidValue(fields()[3], other.email)) {
this.email = data().deepCopy(fields()[3].schema(), other.email);
fieldSetFlags()[3] = true;
}
if (isValidValue(fields()[4], other.phone)) {
this.phone = data().deepCopy(fields()[4].schema(), other.phone);
fieldSetFlags()[4] = true;
}
}
/**
* Creates a Builder by copying an existing Customer instance
* @param other The existing instance to copy.
*/
private Builder(com.kafkastream.model.Customer other) {
super(SCHEMA$);
if (isValidValue(fields()[0], other.customerId)) {
this.customerId = data().deepCopy(fields()[0].schema(), other.customerId);
fieldSetFlags()[0] = true;
}
if (isValidValue(fields()[1], other.firstName)) {
this.firstName = data().deepCopy(fields()[1].schema(), other.firstName);
fieldSetFlags()[1] = true;
}
if (isValidValue(fields()[2], other.lastName)) {
this.lastName = data().deepCopy(fields()[2].schema(), other.lastName);
fieldSetFlags()[2] = true;
}
if (isValidValue(fields()[3], other.email)) {
this.email = data().deepCopy(fields()[3].schema(), other.email);
fieldSetFlags()[3] = true;
}
if (isValidValue(fields()[4], other.phone)) {
this.phone = data().deepCopy(fields()[4].schema(), other.phone);
fieldSetFlags()[4] = true;
}
}
/**
* Gets the value of the 'customerId' field.
* @return The value.
*/
public java.lang.CharSequence getCustomerId() {
return customerId;
}
/**
* Sets the value of the 'customerId' field.
* @param value The value of 'customerId'.
* @return This builder.
*/
public com.kafkastream.model.Customer.Builder setCustomerId(java.lang.CharSequence value) {
validate(fields()[0], value);
this.customerId = value;
fieldSetFlags()[0] = true;
return this;
}
/**
* Checks whether the 'customerId' field has been set.
* @return True if the 'customerId' field has been set, false otherwise.
*/
public boolean hasCustomerId() {
return fieldSetFlags()[0];
}
/**
* Clears the value of the 'customerId' field.
* @return This builder.
*/
public com.kafkastream.model.Customer.Builder clearCustomerId() {
customerId = null;
fieldSetFlags()[0] = false;
return this;
}
/**
* Gets the value of the 'firstName' field.
* @return The value.
*/
public java.lang.CharSequence getFirstName() {
return firstName;
}
/**
* Sets the value of the 'firstName' field.
* @param value The value of 'firstName'.
* @return This builder.
*/
public com.kafkastream.model.Customer.Builder setFirstName(java.lang.CharSequence value) {
validate(fields()[1], value);
this.firstName = value;
fieldSetFlags()[1] = true;
return this;
}
/**
* Checks whether the 'firstName' field has been set.
* @return True if the 'firstName' field has been set, false otherwise.
*/
public boolean hasFirstName() {
return fieldSetFlags()[1];
}
/**
* Clears the value of the 'firstName' field.
* @return This builder.
*/
public com.kafkastream.model.Customer.Builder clearFirstName() {
firstName = null;
fieldSetFlags()[1] = false;
return this;
}
/**
* Gets the value of the 'lastName' field.
* @return The value.
*/
public java.lang.CharSequence getLastName() {
return lastName;
}
/**
* Sets the value of the 'lastName' field.
* @param value The value of 'lastName'.
* @return This builder.
*/
public com.kafkastream.model.Customer.Builder setLastName(java.lang.CharSequence value) {
validate(fields()[2], value);
this.lastName = value;
fieldSetFlags()[2] = true;
return this;
}
/**
* Checks whether the 'lastName' field has been set.
* @return True if the 'lastName' field has been set, false otherwise.
*/
public boolean hasLastName() {
return fieldSetFlags()[2];
}
/**
* Clears the value of the 'lastName' field.
* @return This builder.
*/
public com.kafkastream.model.Customer.Builder clearLastName() {
lastName = null;
fieldSetFlags()[2] = false;
return this;
}
/**
* Gets the value of the 'email' field.
* @return The value.
*/
public java.lang.CharSequence getEmail() {
return email;
}
/**
* Sets the value of the 'email' field.
* @param value The value of 'email'.
* @return This builder.
*/
public com.kafkastream.model.Customer.Builder setEmail(java.lang.CharSequence value) {
validate(fields()[3], value);
this.email = value;
fieldSetFlags()[3] = true;
return this;
}
/**
* Checks whether the 'email' field has been set.
* @return True if the 'email' field has been set, false otherwise.
*/
public boolean hasEmail() {
return fieldSetFlags()[3];
}
/**
* Clears the value of the 'email' field.
* @return This builder.
*/
public com.kafkastream.model.Customer.Builder clearEmail() {
email = null;
fieldSetFlags()[3] = false;
return this;
}
/**
* Gets the value of the 'phone' field.
* @return The value.
*/
public java.lang.CharSequence getPhone() {
return phone;
}
/**
* Sets the value of the 'phone' field.
* @param value The value of 'phone'.
* @return This builder.
*/
public com.kafkastream.model.Customer.Builder setPhone(java.lang.CharSequence value) {
validate(fields()[4], value);
this.phone = value;
fieldSetFlags()[4] = true;
return this;
}
/**
* Checks whether the 'phone' field has been set.
* @return True if the 'phone' field has been set, false otherwise.
*/
public boolean hasPhone() {
return fieldSetFlags()[4];
}
/**
* Clears the value of the 'phone' field.
* @return This builder.
*/
public com.kafkastream.model.Customer.Builder clearPhone() {
phone = null;
fieldSetFlags()[4] = false;
return this;
}
@Override
@SuppressWarnings("unchecked")
public Customer build() {
try {
Customer record = new Customer();
record.customerId = fieldSetFlags()[0] ? this.customerId : (java.lang.CharSequence) defaultValue(fields()[0]);
record.firstName = fieldSetFlags()[1] ? this.firstName : (java.lang.CharSequence) defaultValue(fields()[1]);
record.lastName = fieldSetFlags()[2] ? this.lastName : (java.lang.CharSequence) defaultValue(fields()[2]);
record.email = fieldSetFlags()[3] ? this.email : (java.lang.CharSequence) defaultValue(fields()[3]);
record.phone = fieldSetFlags()[4] ? this.phone : (java.lang.CharSequence) defaultValue(fields()[4]);
return record;
} catch (java.lang.Exception e) {
throw new org.apache.avro.AvroRuntimeException(e);
}
}
}
@SuppressWarnings("unchecked")
private static final org.apache.avro.io.DatumWriter<Customer>
WRITER$ = (org.apache.avro.io.DatumWriter<Customer>)MODEL$.createDatumWriter(SCHEMA$);
@Override public void writeExternal(java.io.ObjectOutput out)
throws java.io.IOException {
WRITER$.write(this, SpecificData.getEncoder(out));
}
@SuppressWarnings("unchecked")
private static final org.apache.avro.io.DatumReader<Customer>
READER$ = (org.apache.avro.io.DatumReader<Customer>)MODEL$.createDatumReader(SCHEMA$);
@Override public void readExternal(java.io.ObjectInput in)
throws java.io.IOException {
READER$.read(this, SpecificData.getDecoder(in));
}
}
我希望这 post 对某人有所帮助。
我在我的项目中使用带 Spring Boot 的 Kafka Streams。在我的用例中,我通过使用 SpecificAvroSerializer
序列化并使用 KStream API 来发送一个 Order 对象。当我使用 KafkaProducer 发送对象时,出现以下异常
nested exception is org.apache.kafka.common.errors.SerializationException: Error serializing Avro message] with root cause. java.lang.NullPointerException: null
基于Confluent examples. Not sure where I made a mistake. I would really appreciate any help. Code uploaded to Github开发本项目,供参考。
异常:
2018-04-17 16:19:39.170 ERROR 6161 --- [nio-8090-exec-1] o.a.c.c.C.[.[.[/].[dispatcherServlet] : Servlet.service() for servlet [dispatcherServlet] in context with path [] threw exception [Request processing failed; nested exception is org.apache.kafka.common.errors.SerializationException: Error serializing Avro message] with root cause
java.lang.NullPointerException: null
at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.registerAndGetId(CachedSchemaRegistryClient.java:57) ~[kafka-schema-registry-client-3.0.0.jar:na]
at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.register(CachedSchemaRegistryClient.java:89) ~[kafka-schema-registry-client-3.0.0.jar:na]
at io.confluent.kafka.serializers.AbstractKafkaAvroSerializer.serializeImpl(AbstractKafkaAvroSerializer.java:72) ~[kafka-avro-serializer-3.0.0.jar:na]
at io.confluent.kafka.serializers.KafkaAvroSerializer.serialize(KafkaAvroSerializer.java:54) ~[kafka-avro-serializer-3.0.0.jar:na]
at com.kafkastream.stream.SpecificAvroSerializer.serialize(SpecificAvroSerializer.java:47) ~[classes/:na]
at com.kafkastream.stream.SpecificAvroSerializer.serialize(SpecificAvroSerializer.java:12) ~[classes/:na]
at org.apache.kafka.common.serialization.ExtendedSerializer$Wrapper.serialize(ExtendedSerializer.java:65) ~[kafka-clients-1.0.1.jar:na]
at org.apache.kafka.common.serialization.ExtendedSerializer$Wrapper.serialize(ExtendedSerializer.java:55) ~[kafka-clients-1.0.1.jar:na]
at org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:791) ~[kafka-clients-1.0.1.jar:na]
at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:768) ~[kafka-clients-1.0.1.jar:na]
at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:656) ~[kafka-clients-1.0.1.jar:na]
at com.kafkastream.service.EventsSender.sendCustomerEvent(EventsSender.java:57) ~[classes/:na]
at com.kafkastream.web.EventsController.sendCustomers(EventsController.java:53) ~[classes/:na]
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:1.8.0_144]
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[na:1.8.0_144]
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:1.8.0_144]
at java.lang.reflect.Method.invoke(Method.java:498) ~[na:1.8.0_144]
at org.springframework.web.method.support.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:209) ~[spring-web-5.0.5.RELEASE.jar:5.0.5.RELEASE]
at org.springframework.web.method.support.InvocableHandlerMethod.invokeForRequest(InvocableHandlerMethod.java:136) ~[spring-web-5.0.5.RELEASE.jar:5.0.5.RELEASE]
at org.springframework.web.servlet.mvc.method.annotation.ServletInvocableHandlerMethod.invokeAndHandle(ServletInvocableHandlerMethod.java:102) ~[spring-webmvc-5.0.5.RELEASE.jar:5.0.5.RELEASE]
at org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.invokeHandlerMethod(RequestMappingHandlerAdapter.java:877) ~[spring-webmvc-5.0.5.RELEASE.jar:5.0.5.RELEASE]
at org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.handleInternal(RequestMappingHandlerAdapter.java:783) ~[spring-webmvc-5.0.5.RELEASE.jar:5.0.5.RELEASE]
at org.springframework.web.servlet.mvc.method.AbstractHandlerMethodAdapter.handle(AbstractHandlerMethodAdapter.java:87) ~[spring-webmvc-5.0.5.RELEASE.jar:5.0.5.RELEASE]
at org.springframework.web.servlet.DispatcherServlet.doDispatch(DispatcherServlet.java:991) ~[spring-webmvc-5.0.5.RELEASE.jar:5.0.5.RELEASE]
at org.springframework.web.servlet.DispatcherServlet.doService(DispatcherServlet.java:925) ~[spring-webmvc-5.0.5.RELEASE.jar:5.0.5.RELEASE]
at org.springframework.web.servlet.FrameworkServlet.processRequest(FrameworkServlet.java:974) ~[spring-webmvc-5.0.5.RELEASE.jar:5.0.5.RELEASE]
at org.springframework.web.servlet.FrameworkServlet.doGet(FrameworkServlet.java:866) ~[spring-webmvc-5.0.5.RELEASE.jar:5.0.5.RELEASE]
at javax.servlet.http.HttpServlet.service(HttpServlet.java:635) ~[tomcat-embed-core-8.5.29.jar:8.5.29]
at org.springframework.web.servlet.FrameworkServlet.service(FrameworkServlet.java:851) ~[spring-webmvc-5.0.5.RELEASE.jar:5.0.5.RELEASE]
at javax.servlet.http.HttpServlet.service(HttpServlet.java:742) ~[tomcat-embed-core-8.5.29.jar:8.5.29]
at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:231) ~[tomcat-embed-core-8.5.29.jar:8.5.29]
at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166) ~[tomcat-embed-core-8.5.29.jar:8.5.29]
at org.apache.tomcat.websocket.server.WsFilter.doFilter(WsFilter.java:52) ~[tomcat-embed-websocket-8.5.29.jar:8.5.29]
at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193) ~[tomcat-embed-core-8.5.29.jar:8.5.29]
at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166) ~[tomcat-embed-core-8.5.29.jar:8.5.29]
at org.springframework.web.filter.RequestContextFilter.doFilterInternal(RequestContextFilter.java:99) ~[spring-web-5.0.5.RELEASE.jar:5.0.5.RELEASE]
at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:107) ~[spring-web-5.0.5.RELEASE.jar:5.0.5.RELEASE]
at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193) ~[tomcat-embed-core-8.5.29.jar:8.5.29]
at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166) ~[tomcat-embed-core-8.5.29.jar:8.5.29]
at org.springframework.web.filter.HttpPutFormContentFilter.doFilterInternal(HttpPutFormContentFilter.java:109) ~[spring-web-5.0.5.RELEASE.jar:5.0.5.RELEASE]
at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:107) ~[spring-web-5.0.5.RELEASE.jar:5.0.5.RELEASE]
at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193) ~[tomcat-embed-core-8.5.29.jar:8.5.29]
at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166) ~[tomcat-embed-core-8.5.29.jar:8.5.29]
at org.springframework.web.filter.HiddenHttpMethodFilter.doFilterInternal(HiddenHttpMethodFilter.java:81) ~[spring-web-5.0.5.RELEASE.jar:5.0.5.RELEASE]
at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:107) ~[spring-web-5.0.5.RELEASE.jar:5.0.5.RELEASE]
at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193) ~[tomcat-embed-core-8.5.29.jar:8.5.29]
at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166) ~[tomcat-embed-core-8.5.29.jar:8.5.29]
at org.springframework.web.filter.CharacterEncodingFilter.doFilterInternal(CharacterEncodingFilter.java:200) ~[spring-web-5.0.5.RELEASE.jar:5.0.5.RELEASE]
at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:107) ~[spring-web-5.0.5.RELEASE.jar:5.0.5.RELEASE]
at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193) ~[tomcat-embed-core-8.5.29.jar:8.5.29]
at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166) ~[tomcat-embed-core-8.5.29.jar:8.5.29]
at org.apache.catalina.core.StandardWrapperValve.invoke(StandardWrapperValve.java:198) ~[tomcat-embed-core-8.5.29.jar:8.5.29]
at org.apache.catalina.core.StandardContextValve.invoke(StandardContextValve.java:96) [tomcat-embed-core-8.5.29.jar:8.5.29]
at org.apache.catalina.authenticator.AuthenticatorBase.invoke(AuthenticatorBase.java:496) [tomcat-embed-core-8.5.29.jar:8.5.29]
at org.apache.catalina.core.StandardHostValve.invoke(StandardHostValve.java:140) [tomcat-embed-core-8.5.29.jar:8.5.29]
at org.apache.catalina.valves.ErrorReportValve.invoke(ErrorReportValve.java:81) [tomcat-embed-core-8.5.29.jar:8.5.29]
at org.apache.catalina.core.StandardEngineValve.invoke(StandardEngineValve.java:87) [tomcat-embed-core-8.5.29.jar:8.5.29]
at org.apache.catalina.connector.CoyoteAdapter.service(CoyoteAdapter.java:342) [tomcat-embed-core-8.5.29.jar:8.5.29]
at org.apache.coyote.http11.Http11Processor.service(Http11Processor.java:803) [tomcat-embed-core-8.5.29.jar:8.5.29]
at org.apache.coyote.AbstractProcessorLight.process(AbstractProcessorLight.java:66) [tomcat-embed-core-8.5.29.jar:8.5.29]
at org.apache.coyote.AbstractProtocol$ConnectionHandler.process(AbstractProtocol.java:790) [tomcat-embed-core-8.5.29.jar:8.5.29]
at org.apache.tomcat.util.net.NioEndpoint$SocketProcessor.doRun(NioEndpoint.java:1459) [tomcat-embed-core-8.5.29.jar:8.5.29]
at org.apache.tomcat.util.net.SocketProcessorBase.run(SocketProcessorBase.java:49) [tomcat-embed-core-8.5.29.jar:8.5.29]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [na:1.8.0_144]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [na:1.8.0_144]
at org.apache.tomcat.util.threads.TaskThread$WrappingRunnable.run(TaskThread.java:61) [tomcat-embed-core-8.5.29.jar:8.5.29]
at java.lang.Thread.run(Thread.java:748) [na:1.8.0_144]
终于,我能够解决这个问题了。我在使用 Avro serde 时犯了一个基本错误。模型 classes 由 avro 工具基于 Customer.avsc 或 Order.avsc 文件自动生成。我正在手动创建这些 classes 并用它们实现特定记录 class。实现后,覆盖方法即 getSchema()、get、put 没有任何代码。当编译器执行 getSchema()
语句时,它返回 null
并抛出 Null Pointer Exception
。
package com.kafkastream.model;
import org.apache.avro.Schema;
import org.apache.avro.specific.SpecificRecord;
import java.util.Objects;
public class Customer implements SpecificRecord
{
public String customerId;
public String firstName;
public String lastName;
public String email;
public String phone;
public Customer()
{
}
public Customer(String customerId, String firstName, String lastName, String email, String phone)
{
this.customerId = customerId;
this.firstName = firstName;
this.lastName = lastName;
this.email = email;
this.phone = phone;
}
// Getters and Setters here
@Override
public void put(int i, Object o)
{
}
@Override
public Object get(int i)
{
return null;
}
@Override
public Schema getSchema()
{
return null;
}
}
经过一些研究,我对项目做了一些改动。
- 为项目添加了
avro-tools
依赖 - 已将 avro 生成器插件添加到 pom.xml 文件
插件:
<plugin>
<groupId>org.apache.avro</groupId>
<artifactId>avro-maven-plugin</artifactId>
<version>${avro.version}</version>
<executions>
<execution>
<phase>generate-sources</phase>
<goals>
<goal>schema</goal>
<goal>protocol</goal>
<goal>idl-protocol</goal>
</goals>
<configuration>
<sourceDirectory>${project.basedir}/src/main/resources/avro</sourceDirectory>
<outputDirectory>${project.basedir}/src/main/java/</outputDirectory>
</configuration>
</execution>
</executions>
</plugin>
- 在 ../resources/avro 文件夹 下创建了 Customer.avsc、Order.avsc 和 Greetings.avsc 文件
- 在此之后编译项目
mvn clean package --DskipTests
或Right click on project Maven --> Generate sources
编译项目通过 avro 工具自动在模型文件夹中创建模型 classes(Customer class,如下所示)
Customer.java(由 avro 生成)
/**
* Autogenerated by Avro
*
* DO NOT EDIT DIRECTLY
*/
package com.kafkastream.model;
import org.apache.avro.specific.SpecificData;
import org.apache.avro.message.BinaryMessageEncoder;
import org.apache.avro.message.BinaryMessageDecoder;
import org.apache.avro.message.SchemaStore;
@SuppressWarnings("all")
@org.apache.avro.specific.AvroGenerated
public class Customer extends org.apache.avro.specific.SpecificRecordBase implements org.apache.avro.specific.SpecificRecord {
private static final long serialVersionUID = 2729048783015827572L;
public static final org.apache.avro.Schema SCHEMA$ = new org.apache.avro.Schema.Parser().parse("{\"type\":\"record\",\"name\":\"Customer\",\"namespace\":\"com.kafkastream.model\",\"fields\":[{\"name\":\"customerId\",\"type\":\"string\"},{\"name\":\"firstName\",\"type\":\"string\"},{\"name\":\"lastName\",\"type\":\"string\"},{\"name\":\"email\",\"type\":\"string\"},{\"name\":\"phone\",\"type\":\"string\"}]}");
public static org.apache.avro.Schema getClassSchema() { return SCHEMA$; }
private static SpecificData MODEL$ = new SpecificData();
private static final BinaryMessageEncoder<Customer> ENCODER =
new BinaryMessageEncoder<Customer>(MODEL$, SCHEMA$);
private static final BinaryMessageDecoder<Customer> DECODER =
new BinaryMessageDecoder<Customer>(MODEL$, SCHEMA$);
/**
* Return the BinaryMessageDecoder instance used by this class.
*/
public static BinaryMessageDecoder<Customer> getDecoder() {
return DECODER;
}
/**
* Create a new BinaryMessageDecoder instance for this class that uses the specified {@link SchemaStore}.
* @param resolver a {@link SchemaStore} used to find schemas by fingerprint
*/
public static BinaryMessageDecoder<Customer> createDecoder(SchemaStore resolver) {
return new BinaryMessageDecoder<Customer>(MODEL$, SCHEMA$, resolver);
}
/** Serializes this Customer to a ByteBuffer. */
public java.nio.ByteBuffer toByteBuffer() throws java.io.IOException {
return ENCODER.encode(this);
}
/** Deserializes a Customer from a ByteBuffer. */
public static Customer fromByteBuffer(
java.nio.ByteBuffer b) throws java.io.IOException {
return DECODER.decode(b);
}
@Deprecated public java.lang.CharSequence customerId;
@Deprecated public java.lang.CharSequence firstName;
@Deprecated public java.lang.CharSequence lastName;
@Deprecated public java.lang.CharSequence email;
@Deprecated public java.lang.CharSequence phone;
/**
* Default constructor. Note that this does not initialize fields
* to their default values from the schema. If that is desired then
* one should use <code>newBuilder()</code>.
*/
public Customer() {}
/**
* All-args constructor.
* @param customerId The new value for customerId
* @param firstName The new value for firstName
* @param lastName The new value for lastName
* @param email The new value for email
* @param phone The new value for phone
*/
public Customer(java.lang.CharSequence customerId, java.lang.CharSequence firstName, java.lang.CharSequence lastName, java.lang.CharSequence email, java.lang.CharSequence phone) {
this.customerId = customerId;
this.firstName = firstName;
this.lastName = lastName;
this.email = email;
this.phone = phone;
}
public org.apache.avro.Schema getSchema() { return SCHEMA$; }
// Used by DatumWriter. Applications should not call.
public java.lang.Object get(int field$) {
switch (field$) {
case 0: return customerId;
case 1: return firstName;
case 2: return lastName;
case 3: return email;
case 4: return phone;
default: throw new org.apache.avro.AvroRuntimeException("Bad index");
}
}
// Used by DatumReader. Applications should not call.
@SuppressWarnings(value="unchecked")
public void put(int field$, java.lang.Object value$) {
switch (field$) {
case 0: customerId = (java.lang.CharSequence)value$; break;
case 1: firstName = (java.lang.CharSequence)value$; break;
case 2: lastName = (java.lang.CharSequence)value$; break;
case 3: email = (java.lang.CharSequence)value$; break;
case 4: phone = (java.lang.CharSequence)value$; break;
default: throw new org.apache.avro.AvroRuntimeException("Bad index");
}
}
/**
* Gets the value of the 'customerId' field.
* @return The value of the 'customerId' field.
*/
public java.lang.CharSequence getCustomerId() {
return customerId;
}
/**
* Sets the value of the 'customerId' field.
* @param value the value to set.
*/
public void setCustomerId(java.lang.CharSequence value) {
this.customerId = value;
}
/**
* Gets the value of the 'firstName' field.
* @return The value of the 'firstName' field.
*/
public java.lang.CharSequence getFirstName() {
return firstName;
}
/**
* Sets the value of the 'firstName' field.
* @param value the value to set.
*/
public void setFirstName(java.lang.CharSequence value) {
this.firstName = value;
}
/**
* Gets the value of the 'lastName' field.
* @return The value of the 'lastName' field.
*/
public java.lang.CharSequence getLastName() {
return lastName;
}
/**
* Sets the value of the 'lastName' field.
* @param value the value to set.
*/
public void setLastName(java.lang.CharSequence value) {
this.lastName = value;
}
/**
* Gets the value of the 'email' field.
* @return The value of the 'email' field.
*/
public java.lang.CharSequence getEmail() {
return email;
}
/**
* Sets the value of the 'email' field.
* @param value the value to set.
*/
public void setEmail(java.lang.CharSequence value) {
this.email = value;
}
/**
* Gets the value of the 'phone' field.
* @return The value of the 'phone' field.
*/
public java.lang.CharSequence getPhone() {
return phone;
}
/**
* Sets the value of the 'phone' field.
* @param value the value to set.
*/
public void setPhone(java.lang.CharSequence value) {
this.phone = value;
}
/**
* Creates a new Customer RecordBuilder.
* @return A new Customer RecordBuilder
*/
public static com.kafkastream.model.Customer.Builder newBuilder() {
return new com.kafkastream.model.Customer.Builder();
}
/**
* Creates a new Customer RecordBuilder by copying an existing Builder.
* @param other The existing builder to copy.
* @return A new Customer RecordBuilder
*/
public static com.kafkastream.model.Customer.Builder newBuilder(com.kafkastream.model.Customer.Builder other) {
return new com.kafkastream.model.Customer.Builder(other);
}
/**
* Creates a new Customer RecordBuilder by copying an existing Customer instance.
* @param other The existing instance to copy.
* @return A new Customer RecordBuilder
*/
public static com.kafkastream.model.Customer.Builder newBuilder(com.kafkastream.model.Customer other) {
return new com.kafkastream.model.Customer.Builder(other);
}
/**
* RecordBuilder for Customer instances.
*/
public static class Builder extends org.apache.avro.specific.SpecificRecordBuilderBase<Customer>
implements org.apache.avro.data.RecordBuilder<Customer> {
private java.lang.CharSequence customerId;
private java.lang.CharSequence firstName;
private java.lang.CharSequence lastName;
private java.lang.CharSequence email;
private java.lang.CharSequence phone;
/** Creates a new Builder */
private Builder() {
super(SCHEMA$);
}
/**
* Creates a Builder by copying an existing Builder.
* @param other The existing Builder to copy.
*/
private Builder(com.kafkastream.model.Customer.Builder other) {
super(other);
if (isValidValue(fields()[0], other.customerId)) {
this.customerId = data().deepCopy(fields()[0].schema(), other.customerId);
fieldSetFlags()[0] = true;
}
if (isValidValue(fields()[1], other.firstName)) {
this.firstName = data().deepCopy(fields()[1].schema(), other.firstName);
fieldSetFlags()[1] = true;
}
if (isValidValue(fields()[2], other.lastName)) {
this.lastName = data().deepCopy(fields()[2].schema(), other.lastName);
fieldSetFlags()[2] = true;
}
if (isValidValue(fields()[3], other.email)) {
this.email = data().deepCopy(fields()[3].schema(), other.email);
fieldSetFlags()[3] = true;
}
if (isValidValue(fields()[4], other.phone)) {
this.phone = data().deepCopy(fields()[4].schema(), other.phone);
fieldSetFlags()[4] = true;
}
}
/**
* Creates a Builder by copying an existing Customer instance
* @param other The existing instance to copy.
*/
private Builder(com.kafkastream.model.Customer other) {
super(SCHEMA$);
if (isValidValue(fields()[0], other.customerId)) {
this.customerId = data().deepCopy(fields()[0].schema(), other.customerId);
fieldSetFlags()[0] = true;
}
if (isValidValue(fields()[1], other.firstName)) {
this.firstName = data().deepCopy(fields()[1].schema(), other.firstName);
fieldSetFlags()[1] = true;
}
if (isValidValue(fields()[2], other.lastName)) {
this.lastName = data().deepCopy(fields()[2].schema(), other.lastName);
fieldSetFlags()[2] = true;
}
if (isValidValue(fields()[3], other.email)) {
this.email = data().deepCopy(fields()[3].schema(), other.email);
fieldSetFlags()[3] = true;
}
if (isValidValue(fields()[4], other.phone)) {
this.phone = data().deepCopy(fields()[4].schema(), other.phone);
fieldSetFlags()[4] = true;
}
}
/**
* Gets the value of the 'customerId' field.
* @return The value.
*/
public java.lang.CharSequence getCustomerId() {
return customerId;
}
/**
* Sets the value of the 'customerId' field.
* @param value The value of 'customerId'.
* @return This builder.
*/
public com.kafkastream.model.Customer.Builder setCustomerId(java.lang.CharSequence value) {
validate(fields()[0], value);
this.customerId = value;
fieldSetFlags()[0] = true;
return this;
}
/**
* Checks whether the 'customerId' field has been set.
* @return True if the 'customerId' field has been set, false otherwise.
*/
public boolean hasCustomerId() {
return fieldSetFlags()[0];
}
/**
* Clears the value of the 'customerId' field.
* @return This builder.
*/
public com.kafkastream.model.Customer.Builder clearCustomerId() {
customerId = null;
fieldSetFlags()[0] = false;
return this;
}
/**
* Gets the value of the 'firstName' field.
* @return The value.
*/
public java.lang.CharSequence getFirstName() {
return firstName;
}
/**
* Sets the value of the 'firstName' field.
* @param value The value of 'firstName'.
* @return This builder.
*/
public com.kafkastream.model.Customer.Builder setFirstName(java.lang.CharSequence value) {
validate(fields()[1], value);
this.firstName = value;
fieldSetFlags()[1] = true;
return this;
}
/**
* Checks whether the 'firstName' field has been set.
* @return True if the 'firstName' field has been set, false otherwise.
*/
public boolean hasFirstName() {
return fieldSetFlags()[1];
}
/**
* Clears the value of the 'firstName' field.
* @return This builder.
*/
public com.kafkastream.model.Customer.Builder clearFirstName() {
firstName = null;
fieldSetFlags()[1] = false;
return this;
}
/**
* Gets the value of the 'lastName' field.
* @return The value.
*/
public java.lang.CharSequence getLastName() {
return lastName;
}
/**
* Sets the value of the 'lastName' field.
* @param value The value of 'lastName'.
* @return This builder.
*/
public com.kafkastream.model.Customer.Builder setLastName(java.lang.CharSequence value) {
validate(fields()[2], value);
this.lastName = value;
fieldSetFlags()[2] = true;
return this;
}
/**
* Checks whether the 'lastName' field has been set.
* @return True if the 'lastName' field has been set, false otherwise.
*/
public boolean hasLastName() {
return fieldSetFlags()[2];
}
/**
* Clears the value of the 'lastName' field.
* @return This builder.
*/
public com.kafkastream.model.Customer.Builder clearLastName() {
lastName = null;
fieldSetFlags()[2] = false;
return this;
}
/**
* Gets the value of the 'email' field.
* @return The value.
*/
public java.lang.CharSequence getEmail() {
return email;
}
/**
* Sets the value of the 'email' field.
* @param value The value of 'email'.
* @return This builder.
*/
public com.kafkastream.model.Customer.Builder setEmail(java.lang.CharSequence value) {
validate(fields()[3], value);
this.email = value;
fieldSetFlags()[3] = true;
return this;
}
/**
* Checks whether the 'email' field has been set.
* @return True if the 'email' field has been set, false otherwise.
*/
public boolean hasEmail() {
return fieldSetFlags()[3];
}
/**
* Clears the value of the 'email' field.
* @return This builder.
*/
public com.kafkastream.model.Customer.Builder clearEmail() {
email = null;
fieldSetFlags()[3] = false;
return this;
}
/**
* Gets the value of the 'phone' field.
* @return The value.
*/
public java.lang.CharSequence getPhone() {
return phone;
}
/**
* Sets the value of the 'phone' field.
* @param value The value of 'phone'.
* @return This builder.
*/
public com.kafkastream.model.Customer.Builder setPhone(java.lang.CharSequence value) {
validate(fields()[4], value);
this.phone = value;
fieldSetFlags()[4] = true;
return this;
}
/**
* Checks whether the 'phone' field has been set.
* @return True if the 'phone' field has been set, false otherwise.
*/
public boolean hasPhone() {
return fieldSetFlags()[4];
}
/**
* Clears the value of the 'phone' field.
* @return This builder.
*/
public com.kafkastream.model.Customer.Builder clearPhone() {
phone = null;
fieldSetFlags()[4] = false;
return this;
}
@Override
@SuppressWarnings("unchecked")
public Customer build() {
try {
Customer record = new Customer();
record.customerId = fieldSetFlags()[0] ? this.customerId : (java.lang.CharSequence) defaultValue(fields()[0]);
record.firstName = fieldSetFlags()[1] ? this.firstName : (java.lang.CharSequence) defaultValue(fields()[1]);
record.lastName = fieldSetFlags()[2] ? this.lastName : (java.lang.CharSequence) defaultValue(fields()[2]);
record.email = fieldSetFlags()[3] ? this.email : (java.lang.CharSequence) defaultValue(fields()[3]);
record.phone = fieldSetFlags()[4] ? this.phone : (java.lang.CharSequence) defaultValue(fields()[4]);
return record;
} catch (java.lang.Exception e) {
throw new org.apache.avro.AvroRuntimeException(e);
}
}
}
@SuppressWarnings("unchecked")
private static final org.apache.avro.io.DatumWriter<Customer>
WRITER$ = (org.apache.avro.io.DatumWriter<Customer>)MODEL$.createDatumWriter(SCHEMA$);
@Override public void writeExternal(java.io.ObjectOutput out)
throws java.io.IOException {
WRITER$.write(this, SpecificData.getEncoder(out));
}
@SuppressWarnings("unchecked")
private static final org.apache.avro.io.DatumReader<Customer>
READER$ = (org.apache.avro.io.DatumReader<Customer>)MODEL$.createDatumReader(SCHEMA$);
@Override public void readExternal(java.io.ObjectInput in)
throws java.io.IOException {
READER$.read(this, SpecificData.getDecoder(in));
}
}
我希望这 post 对某人有所帮助。