Springxd 将 rabbitmq json 消息作为输出转换为自定义 spark 处理器模块中的 pojo
Springxd convert from rabbitmq json message as output to a pojo in a custom spark processor module
我正在尝试创建如下流:
rabbitmq | spark-custom-processor | file-sink
RabbitMq 从队列中读取消息,消息头类型为 application/json,代表我的 POJO class House
。然后这些重新添加的消息应该被传输到我的 spark-custom-processor
并且在处理器完成他的工作之后,将结果存储在一个文件中。
问题是到达 spark-custom-processor
的数据类型是 Byte
而不是 application/json 消息转换为我的 POJO class House
.
如果阅读解释了 Type convertion 的文档,它似乎应该自动完成对我的 POJO House
类型的转换,但事实并非如此。
我也试过使用:
rabbitmq | --inputputType=application/x-java-object;type=domain.MetricBean spark-custom-processor | file-sink
to especify the convertion but the springXdShell
comlains about a sintax error at ;
有没有办法做到这一点,还是我必须自己在 spark-custom-processor
中使用 ObjectMapper
进行映射?
你只需要用 ;
.
引用字符串
我刚刚测试了这个,它运行良好...
xd:>stream create foo --definition "rabbit --outputType='application/x-java-object;type=foo.Foo'
| log" --deploy
.
public class Foo {
private String foo;
public String getFoo() {
return foo;
}
public void setFoo(String foo) {
this.foo = foo;
}
@Override
public String toString() {
return "Foo [foo=" + foo + "]";
}
}
发表{"foo":"bar"}
2015-07-30T10:37:36-0400 1.2.0.RC1 INFO SimpleAsyncTaskExecutor-1 sink.foo - Foo [foo=bar]
我正在尝试创建如下流:
rabbitmq | spark-custom-processor | file-sink
RabbitMq 从队列中读取消息,消息头类型为 application/json,代表我的 POJO class House
。然后这些重新添加的消息应该被传输到我的 spark-custom-processor
并且在处理器完成他的工作之后,将结果存储在一个文件中。
问题是到达 spark-custom-processor
的数据类型是 Byte
而不是 application/json 消息转换为我的 POJO class House
.
如果阅读解释了 Type convertion 的文档,它似乎应该自动完成对我的 POJO House
类型的转换,但事实并非如此。
我也试过使用:
rabbitmq | --inputputType=application/x-java-object;type=domain.MetricBean spark-custom-processor | file-sink to especify the convertion but the
springXdShell
comlains about a sintax error at;
有没有办法做到这一点,还是我必须自己在 spark-custom-processor
中使用 ObjectMapper
进行映射?
你只需要用 ;
.
我刚刚测试了这个,它运行良好...
xd:>stream create foo --definition "rabbit --outputType='application/x-java-object;type=foo.Foo'
| log" --deploy
.
public class Foo {
private String foo;
public String getFoo() {
return foo;
}
public void setFoo(String foo) {
this.foo = foo;
}
@Override
public String toString() {
return "Foo [foo=" + foo + "]";
}
}
发表{"foo":"bar"}
2015-07-30T10:37:36-0400 1.2.0.RC1 INFO SimpleAsyncTaskExecutor-1 sink.foo - Foo [foo=bar]