Apache Flink 如何从 Java ObjectNode -> JSON 字符串下沉?
Apache Flink how to sink from Java ObjectNode -> JSON string?
所以这需要 JSON 个字符串 -> Java ObjectNode。
final DataStream<ObjectNode> inputStream = env
.addSource(new RMQSource<ObjectNode>(
connectionConfig, // config for the RabbitMQ connection
"start", // name of the RabbitMQ queue to consume
true, // use correlation ids; can be false if only at-least-once is required
new JSONDeserializationSchema())) // deserialization schema to turn messages into Java objects
.setParallelism(1); // non-parallel source is only required for exactly-once
如何将它们从 Java ObjectNode -> JSON 字符串放回?
stream.addSink(new RMQSink<ObjectNode>(
connectionConfig,
"stop",
new JSONSerializationSchema()
));
JSONSerializationSchema
不存在,但我需要类似的东西。
像这样使用自定义 SerializationSchema
:
stream.addSink(new RMQSink<ObjectNode>(
connectionConfig,
"stop",
new SerializationSchema<ObjectNode>() {
@Override
public byte[] serialize( ObjectNode element ) {
return element.toString().getBytes();
}
}
));
所以这需要 JSON 个字符串 -> Java ObjectNode。
final DataStream<ObjectNode> inputStream = env
.addSource(new RMQSource<ObjectNode>(
connectionConfig, // config for the RabbitMQ connection
"start", // name of the RabbitMQ queue to consume
true, // use correlation ids; can be false if only at-least-once is required
new JSONDeserializationSchema())) // deserialization schema to turn messages into Java objects
.setParallelism(1); // non-parallel source is only required for exactly-once
如何将它们从 Java ObjectNode -> JSON 字符串放回?
stream.addSink(new RMQSink<ObjectNode>(
connectionConfig,
"stop",
new JSONSerializationSchema()
));
JSONSerializationSchema
不存在,但我需要类似的东西。
像这样使用自定义 SerializationSchema
:
stream.addSink(new RMQSink<ObjectNode>(
connectionConfig,
"stop",
new SerializationSchema<ObjectNode>() {
@Override
public byte[] serialize( ObjectNode element ) {
return element.toString().getBytes();
}
}
));