在 Spring 云数据流中实施 Avro

Implementing Avro in Spring Cloud Dataflow

我正在尝试实现一个 Spring Cloud Dataflow 流,它从数据库中读取记录,将这些数据传递给处理器,处理器将其转换为 Avro 模式,然后将其传递给接收器应用程序使用。

我有数据从 SQL 数据库流向我的源应用程序并通过 Kafka 绑定器毫无问题地传递数据 我 运行 遇到了从处理器到带有 Avro 的 Sink 应用程序 serializing/deserializing。

我创建了一个名为 ech.avsc 的 avro 架构,并使用处理器中的 avro-maven-plugin 为其生成了一个名为 EchRecord 的 class。

我已将以下依赖项添加到处理器和接收器的 pom

<dependency>
  <groupId>org.springframework.cloud</groupId>
  <artifactId>spring-cloud-stream-schema</artifactId>
    <version>1.2.2.RELEASE</version>
</dependency>
<dependency>
  <groupId>org.apache.avro</groupId>
  <artifactId>avro</artifactId>
  <version>1.8.2</version>
</dependency>

我已经将处理器的属性设置为

spring.cloud.stream.bindings.output.contentType=application/*+avro
spring.cloud.stream.schema.avro.dynamicSchemaGenerationEnabled=true
spring.cloud.stream.schemaRegistryClient.endpoint=http://192.168.99.100:8990

在接收器端,属性看起来像 spring.cloud.stream.schemaRegistryClient.endpoint=http://192.168.99.100:8990

处理器应用程序代码如下所示:

@EnableBinding(Processor.class)
@SpringBootApplication
@EnableSchemaRegistryClient
public class EchProcessorApplication {

private static Logger logger = LoggerFactory.getLogger(EchProcessorApplication.class);

public static void main(String[] args) {
    SpringApplication.run(EchProcessorApplication.class, args);
}


@StreamListener(Processor.INPUT)
@SendTo(Processor.OUTPUT)
public EchRecord transform(List<Map<String, Object>> record) {
    return EchRecord.newBuilder()
            .setCallId(11111).build();;
}

在 Sink 端,代码如下所示:

@EnableBinding(Sink.class)
@SpringBootApplication
@EnableSchemaRegistryClient
public class AvroLoggerApplication {



    private static Logger LOGGER = LoggerFactory.getLogger(AvroLoggerApplication.class);

    public static void main(String[] args) {
        SpringApplication.run(AvroLoggerApplication.class, args);
    }


    @StreamListener(Sink.INPUT)
    public void logHandler(Object data) {

        LOGGER.info("data='{}'", data.toString());
        LOGGER.info("class='{}'", data.getClass());


    }
}

我有一个 Spring 架构注册表服务器 运行 并且两个应用程序都可以访问它,我可以在查询注册表时看到该架构已传送到服务器。

我可以看到我是否在接收器应用程序上启用了调试日志记录,即在接收到的消息中正确设置了 contentType:contentType=application/vnd.echrecord.v1+avro

在 Sink 应用程序中,我设置了一个带有 @StreamListener 注释的方法来检索接受对象并打印出数据和 class 类型的消息,它似乎正在检索字节数组。

我如何着手更改 Sink 应用程序的代码以将 Avro 消息反序列化为我可以从中检索设置数据的内容?

这里有几件事要尝试。在生产方面,由于您的类型已经是 Avro 类型(SpecificRecord 或 GenericRecord),因此您不需要 dynamicSchemaGeneration 标志,这适用于基于反射的编写器,主要用于测试,因为它对性能有影响。

由于您的接收器可以看到您发布的正确类型,您现在需要的是将您的类型放在接收器上。因此,例如在接收器上添加类型并使用正确的类型注释方法:EchRecord 将为您提供正确的类型。

您也可以将其设置为 GenericRecord 以便能够像使用对象容器一样访问它 record.get(<propertyname>)