Springxd 将 rabbitmq json 消息作为输出转换为自定义 spark 处理器模块中的 pojo

Springxd convert from rabbitmq json message as output to a pojo in a custom spark processor module

我正在尝试创建如下流:

rabbitmq | spark-custom-processor | file-sink

RabbitMq 从队列中读取消息,消息头类型为 application/json,代表我的 POJO class House。然后这些重新添加的消息应该被传输到我的 spark-custom-processor 并且在处理器完成他的工作之后,将结果存储在一个文件中。

问题是到达 spark-custom-processor 的数据类型是 Byte 而不是 application/json 消息转换为我的 POJO class House.

如果阅读解释了 Type convertion 的文档,它似乎应该自动完成对我的 POJO House 类型的转换,但事实并非如此。

我也试过使用:

rabbitmq | --inputputType=application/x-java-object;type=domain.MetricBean spark-custom-processor | file-sink to especify the convertion but the springXdShell comlains about a sintax error at ;

有没有办法做到这一点,还是我必须自己在 spark-custom-processor 中使用 ObjectMapper 进行映射?

你只需要用 ;.

引用字符串

我刚刚测试了这个,它运行良好...

xd:>stream create foo --definition "rabbit --outputType='application/x-java-object;type=foo.Foo' 
    | log" --deploy

.

public class Foo {

    private String foo;

    public String getFoo() {
        return foo;
    }

    public void setFoo(String foo) {
        this.foo = foo;
    }

    @Override
    public String toString() {
        return "Foo [foo=" + foo + "]";
    }

}

发表{"foo":"bar"}

2015-07-30T10:37:36-0400 1.2.0.RC1 INFO SimpleAsyncTaskExecutor-1 sink.foo - Foo [foo=bar]