flink:从kafka获取字节[]数据
flink: getting byte[] data from kafka
我正在使用 flink-1.0-SNAPSHOT 从 kafka 使用数据。数据以 Snappy compressed byte[] 的形式传入,并传递给 thrift 供以后使用。
当我使用 flink 检索数据时,数据已损坏或处理不当,无法解压缩。代码来源于this样本,如下:
DataStream<String> messageStream = env.addSource(new FlinkKafkaConsumer081<>(parameterTool.getRequired("topic"), new SimpleStringSchema(), parameterTool.getProperties()));
messageStream.rebalance().map(new MapFunction<String, String>() {
@Override public String map(String value) throws Exception {
boolean bvalid = Snappy.isValidCompressedBuffer(value.getBytes());
});
isValidCompressedBufferreturns每次都是假的。
众所周知,通过其他途径使用数据时数据是好的。
我错过了什么?
解决方案:
我发布这个是因为我找不到任何使用 RawSchema
的示例。
public static void main(String[] args) throws Exception {
// create execution environment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// parse user parameters
ParameterTool parameterTool = ParameterTool.fromArgs(args);
DataStream<byte[]> dataStream = env.addSource(new FlinkKafkaConsumer081<>(parameterTool.getRequired("topic"), new RawSchema(), parameterTool.getProperties()));
dataStream.map(new MapFunction<byte[], Object>() {
@Override
public Object map(byte[] bytes) throws Exception {
boolean bvali = Snappy.isValidCompressedBuffer(bytes);
});
return 0;
}
}).print();
env.execute();
}
将字节消息读取为字符串是不正确的。
您应该按原样读取字节然后解压缩:
public Object map(byte[] bytes) throws Exception {
boolean bvalid = Snappy.isValidCompressedBuffer(bytes);
...
我正在使用 flink-1.0-SNAPSHOT 从 kafka 使用数据。数据以 Snappy compressed byte[] 的形式传入,并传递给 thrift 供以后使用。
当我使用 flink 检索数据时,数据已损坏或处理不当,无法解压缩。代码来源于this样本,如下:
DataStream<String> messageStream = env.addSource(new FlinkKafkaConsumer081<>(parameterTool.getRequired("topic"), new SimpleStringSchema(), parameterTool.getProperties()));
messageStream.rebalance().map(new MapFunction<String, String>() {
@Override public String map(String value) throws Exception {
boolean bvalid = Snappy.isValidCompressedBuffer(value.getBytes());
});
isValidCompressedBufferreturns每次都是假的。
众所周知,通过其他途径使用数据时数据是好的。
我错过了什么?
解决方案:
我发布这个是因为我找不到任何使用 RawSchema
的示例。
public static void main(String[] args) throws Exception {
// create execution environment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// parse user parameters
ParameterTool parameterTool = ParameterTool.fromArgs(args);
DataStream<byte[]> dataStream = env.addSource(new FlinkKafkaConsumer081<>(parameterTool.getRequired("topic"), new RawSchema(), parameterTool.getProperties()));
dataStream.map(new MapFunction<byte[], Object>() {
@Override
public Object map(byte[] bytes) throws Exception {
boolean bvali = Snappy.isValidCompressedBuffer(bytes);
});
return 0;
}
}).print();
env.execute();
}
将字节消息读取为字符串是不正确的。 您应该按原样读取字节然后解压缩:
public Object map(byte[] bytes) throws Exception {
boolean bvalid = Snappy.isValidCompressedBuffer(bytes);
...