在 Spring Cloud Dataflow 中激活 Avro 消息转换器

Activating Avro message converter in Spring Cloud Dataflow

我正在 Spring Cloud Dataflow 上实现一个流应用程序。我想使用基于 Avro 的模式注册表客户端进行序列化和模式控制。 我的基本目标是为 Source 应用程序提供一些外部数据,将其转换为准备好的基于 avro 的模式并将其发送到仅接受此模式的 Sink 应用程序。 我想使用来自外部模式注册表服务器的模式,而不是模式的文件版本。 我的代码如下所示:

@EnableBinding(Source.class)
@EnableSchemaRegistryClient
public class DisSampleSource {

    private final DisSampleSourceProperties properties;

    @Inject
    public DisSampleSource(DisSampleSourceProperties properties) {
        this.properties = properties;
    }

    @InboundChannelAdapter(Source.OUTPUT)
    public String feed() throws IOException {
        if (!Paths.get(properties.getPath()).toFile().exists()) {
            throw new InvalidPathException(this.properties.getPath(),
                    "The file does not exists or is of not proper type.");
        }
        return new String(Files.readAllBytes(Paths.get(properties.getPath())), StandardCharsets.UTF_8);
    }
}

POM:

    <dependency>
        <groupId>org.springframework.cloud</groupId>
        <artifactId>spring-cloud-stream-schema</artifactId>
        <version>1.1.1.RELEASE</version>
    </dependency>

在启动应用程序期间,我正在通过 属性:

 --spring.cloud.stream.bindings.output.contentType=application/foo.bar.v1+avro

目前,应用程序无法启动,出现以下异常:

2017-02-13 16:25:30.430  WARN 2444 --- [           main] ationConfigEmbeddedWebApplicationContext : Exception encountered during context initialization - cancelling refresh attempt: org.springframework.beans.factory.BeanCreationException: Error creating bean with name 'disSampleSource' defined in URL [jar:file:/D:/git/dis/sample/source/target/dis-sample-source-1.0.0-SNAPSHOT.jar!/BOOT-INF/classes!/com/atsisa/bit/dis/sample/DisSampleSource.class]: Initialization of bean failed; nested exception is org.springframework.beans.factory.UnsatisfiedDependencyException: Error creating bean with name 'org.springframework.cloud.stream.config.ChannelBindingAutoConfiguration': Unsatisfied dependency expressed through field 'adapters'; nested exception is org.springframework.beans.factory.BeanCreationException: Error creating bean with name 'org.springframework.cloud.stream.messaging.Source': Invocation of init method failed; nested exception is org.springframework.cloud.stream.converter.ConversionException: No message converter is registered for application/foo.bar.v1+avro
2017-02-13 16:25:30.440  INFO 2444 --- [           main] o.apache.catalina.core.StandardService   : Stopping service Tomcat
2017-02-13 16:25:30.480  INFO 2444 --- [           main] utoConfigurationReportLoggingInitializer :

Error starting ApplicationContext. To display the auto-configuration report re-run your application with 'debug' enabled.
2017-02-13 16:25:30.485 ERROR 2444 --- [           main] o.s.boot.SpringApplication               : Application startup failed

org.springframework.beans.factory.BeanCreationException: Error creating bean with name 'disSampleSource' defined in URL [jar:file:/D:/git/dis/sample/source/target/dis-sample-source-1.0.0-SNAPSHOT.jar!/BOOT-INF/classes!/com/atsisa/bit/dis/sample/DisSampleSource.class]: Initialization of bean failed; nested exception is org.springframework.beans.factory.UnsatisfiedDependencyException: Error creating bean with name 'org.springframework.cloud.stream.config.ChannelBindingAutoConfiguration': Unsatisfied dependency expressed through field 'adapters'; nested exception is org.springframework.beans.factory.BeanCreationException: Error creating bean with name 'org.springframework.cloud.stream.messaging.Source': Invocation of init method failed; nested exception is org.springframework.cloud.stream.converter.ConversionException: No message converter is registered for application/foo.bar.v1+avro
        at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.doCreateBean(AbstractAutowireCapableBeanFactory.java:562) ~[spring-beans-4.3.4.RELEASE.jar!/:4.3.4.RELEASE]
        at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.createBean(AbstractAutowireCapableBeanFactory.java:482) ~[spring-beans-4.3.4.RELEASE.jar!/:4.3.4.RELEASE]
        at org.springframework.beans.factory.support.AbstractBeanFactory.getObject(AbstractBeanFactory.java:306) ~[spring-beans-4.3.4.RELEASE.jar!/:4.3.4.RELEASE]
        at org.springframework.beans.factory.support.DefaultSingletonBeanRegistry.getSingleton(DefaultSingletonBeanRegistry.java:230) ~[spring-beans-4.3.4.RELEASE.jar!/:4.3.4.RELEASE]
        at org.springframework.beans.factory.support.AbstractBeanFactory.doGetBean(AbstractBeanFactory.java:302) ~[spring-beans-4.3.4.RELEASE.jar!/:4.3.4.RELEASE]
        at org.springframework.beans.factory.support.AbstractBeanFactory.getBean(AbstractBeanFactory.java:197) ~[spring-beans-4.3.4.RELEASE.jar!/:4.3.4.RELEASE]
        at org.springframework.beans.factory.support.DefaultListableBeanFactory.preInstantiateSingletons(DefaultListableBeanFactory.java:754) ~[spring-beans-4.3.4.RELEASE.jar!/:4.3.4.RELEASE]
        at org.springframework.context.support.AbstractApplicationContext.finishBeanFactoryInitialization(AbstractApplicationContext.java:866) ~[spring-context-4.3.4.RELEASE.jar!/:4.3.4.RELEASE]

我做错了什么?

Avro 是 Spring Cloud Stream Schema 的可选依赖项(因为其目的是在将来支持其他格式。为了激活模式支持,您应该简单地添加

 <dependency>
     <groupId>org.apache.avro</groupId>
     <artifactId>avro</artifactId>
     <version>1.8.1</version>
 </dependency>

到项目。