在 Spring 云数据流中实施 Avro
Implementing Avro in Spring Cloud Dataflow
我正在尝试实现一个 Spring Cloud Dataflow 流,它从数据库中读取记录,将这些数据传递给处理器,处理器将其转换为 Avro 模式,然后将其传递给接收器应用程序使用。
我有数据从 SQL 数据库流向我的源应用程序并通过 Kafka 绑定器毫无问题地传递数据 我 运行 遇到了从处理器到带有 Avro 的 Sink 应用程序 serializing/deserializing。
我创建了一个名为 ech.avsc 的 avro 架构,并使用处理器中的 avro-maven-plugin 为其生成了一个名为 EchRecord 的 class。
我已将以下依赖项添加到处理器和接收器的 pom
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-schema</artifactId>
<version>1.2.2.RELEASE</version>
</dependency>
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
<version>1.8.2</version>
</dependency>
我已经将处理器的属性设置为
spring.cloud.stream.bindings.output.contentType=application/*+avro
spring.cloud.stream.schema.avro.dynamicSchemaGenerationEnabled=true
spring.cloud.stream.schemaRegistryClient.endpoint=http://192.168.99.100:8990
在接收器端,属性看起来像
spring.cloud.stream.schemaRegistryClient.endpoint=http://192.168.99.100:8990
处理器应用程序代码如下所示:
@EnableBinding(Processor.class)
@SpringBootApplication
@EnableSchemaRegistryClient
public class EchProcessorApplication {
private static Logger logger = LoggerFactory.getLogger(EchProcessorApplication.class);
public static void main(String[] args) {
SpringApplication.run(EchProcessorApplication.class, args);
}
@StreamListener(Processor.INPUT)
@SendTo(Processor.OUTPUT)
public EchRecord transform(List<Map<String, Object>> record) {
return EchRecord.newBuilder()
.setCallId(11111).build();;
}
在 Sink 端,代码如下所示:
@EnableBinding(Sink.class)
@SpringBootApplication
@EnableSchemaRegistryClient
public class AvroLoggerApplication {
private static Logger LOGGER = LoggerFactory.getLogger(AvroLoggerApplication.class);
public static void main(String[] args) {
SpringApplication.run(AvroLoggerApplication.class, args);
}
@StreamListener(Sink.INPUT)
public void logHandler(Object data) {
LOGGER.info("data='{}'", data.toString());
LOGGER.info("class='{}'", data.getClass());
}
}
我有一个 Spring 架构注册表服务器 运行 并且两个应用程序都可以访问它,我可以在查询注册表时看到该架构已传送到服务器。
我可以看到我是否在接收器应用程序上启用了调试日志记录,即在接收到的消息中正确设置了 contentType:contentType=application/vnd.echrecord.v1+avro
在 Sink 应用程序中,我设置了一个带有 @StreamListener 注释的方法来检索接受对象并打印出数据和 class 类型的消息,它似乎正在检索字节数组。
我如何着手更改 Sink 应用程序的代码以将 Avro 消息反序列化为我可以从中检索设置数据的内容?
这里有几件事要尝试。在生产方面,由于您的类型已经是 Avro 类型(SpecificRecord 或 GenericRecord),因此您不需要 dynamicSchemaGeneration
标志,这适用于基于反射的编写器,主要用于测试,因为它对性能有影响。
由于您的接收器可以看到您发布的正确类型,您现在需要的是将您的类型放在接收器上。因此,例如在接收器上添加类型并使用正确的类型注释方法:EchRecord
将为您提供正确的类型。
您也可以将其设置为 GenericRecord
以便能够像使用对象容器一样访问它 record.get(<propertyname>)
我正在尝试实现一个 Spring Cloud Dataflow 流,它从数据库中读取记录,将这些数据传递给处理器,处理器将其转换为 Avro 模式,然后将其传递给接收器应用程序使用。
我有数据从 SQL 数据库流向我的源应用程序并通过 Kafka 绑定器毫无问题地传递数据 我 运行 遇到了从处理器到带有 Avro 的 Sink 应用程序 serializing/deserializing。
我创建了一个名为 ech.avsc 的 avro 架构,并使用处理器中的 avro-maven-plugin 为其生成了一个名为 EchRecord 的 class。
我已将以下依赖项添加到处理器和接收器的 pom
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-schema</artifactId>
<version>1.2.2.RELEASE</version>
</dependency>
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
<version>1.8.2</version>
</dependency>
我已经将处理器的属性设置为
spring.cloud.stream.bindings.output.contentType=application/*+avro
spring.cloud.stream.schema.avro.dynamicSchemaGenerationEnabled=true
spring.cloud.stream.schemaRegistryClient.endpoint=http://192.168.99.100:8990
在接收器端,属性看起来像
spring.cloud.stream.schemaRegistryClient.endpoint=http://192.168.99.100:8990
处理器应用程序代码如下所示:
@EnableBinding(Processor.class)
@SpringBootApplication
@EnableSchemaRegistryClient
public class EchProcessorApplication {
private static Logger logger = LoggerFactory.getLogger(EchProcessorApplication.class);
public static void main(String[] args) {
SpringApplication.run(EchProcessorApplication.class, args);
}
@StreamListener(Processor.INPUT)
@SendTo(Processor.OUTPUT)
public EchRecord transform(List<Map<String, Object>> record) {
return EchRecord.newBuilder()
.setCallId(11111).build();;
}
在 Sink 端,代码如下所示:
@EnableBinding(Sink.class)
@SpringBootApplication
@EnableSchemaRegistryClient
public class AvroLoggerApplication {
private static Logger LOGGER = LoggerFactory.getLogger(AvroLoggerApplication.class);
public static void main(String[] args) {
SpringApplication.run(AvroLoggerApplication.class, args);
}
@StreamListener(Sink.INPUT)
public void logHandler(Object data) {
LOGGER.info("data='{}'", data.toString());
LOGGER.info("class='{}'", data.getClass());
}
}
我有一个 Spring 架构注册表服务器 运行 并且两个应用程序都可以访问它,我可以在查询注册表时看到该架构已传送到服务器。
我可以看到我是否在接收器应用程序上启用了调试日志记录,即在接收到的消息中正确设置了 contentType:contentType=application/vnd.echrecord.v1+avro
在 Sink 应用程序中,我设置了一个带有 @StreamListener 注释的方法来检索接受对象并打印出数据和 class 类型的消息,它似乎正在检索字节数组。
我如何着手更改 Sink 应用程序的代码以将 Avro 消息反序列化为我可以从中检索设置数据的内容?
这里有几件事要尝试。在生产方面,由于您的类型已经是 Avro 类型(SpecificRecord 或 GenericRecord),因此您不需要 dynamicSchemaGeneration
标志,这适用于基于反射的编写器,主要用于测试,因为它对性能有影响。
由于您的接收器可以看到您发布的正确类型,您现在需要的是将您的类型放在接收器上。因此,例如在接收器上添加类型并使用正确的类型注释方法:EchRecord
将为您提供正确的类型。
您也可以将其设置为 GenericRecord
以便能够像使用对象容器一样访问它 record.get(<propertyname>)