发送字节数组到storm kafka bolt
Send byte array to storm kafka bolt
我写了一个storm拓扑。我基本上想以字节数组的形式将 avro 模式中的元组发送到 kafka 主题。
我是这样设置螺栓的:
builder.setBolt(KAFKA_AVRO_BOLT_NAME, new KafkaBolt<String, byte[]>())
.fieldsGrouping(BOLT1, new Fields("key"));
这就是我转换为字节数组的方式
Schema schema = avroObject.getSchema();
DatumWriter<GenericRecord> writer = new GenericDatumWriter<GenericRecord>(schema);
ByteArrayOutputStream out = new ByteArrayOutputStream();
Encoder encoder = EncoderFactory.get().binaryEncoder(out, null);
writer.write(ping, encoder);
encoder.flush();
byte[] message = out.toByteArray();
String key = new String(message, "UTF-8");
当我以下列方式发出元组时,我在 kafka 主题中看不到任何内容(将字节流发送到 kafka):
collector.emit(tuple, new Values(Obj.hashMD5(key), message));
但相反,如果我将字节数组转换为字符串,然后再转换为 kafka 主题,它会起作用:
如下所示:
builder.setBolt(KAFKA_AVRO_BOLT_NAME, new KafkaBolt<String, String>())
.fieldsGrouping(BOLT1, new Fields("key"));
collector.emit(tuple, new Values(Obj.hashMD5(key), key));
我做错了什么?如何使用 storm kafka bolt 将字节流发送到 kafka 主题?
您遇到问题是因为您的 MD5 哈希值不正确:
你说如果你把你的 bytearray 转换成一个 java String 它就起作用了:这是因为根据一个 String,MD5 的值是正确的。
collector.emit(tuple, new Values(Obj.hashMD5(key), key));
正如您所见,MD5 是在一个字符串参数上计算的,您发送与 MD5 对应的字符串:一切都很好!
但是如果你发送一个bytearray,你需要计算一个bytearray的MD5,结果是一个bytearray,而不是一个String。您的代码:
collector.emit(tuple, new Values(Obj.hashMD5(key), message));
不正确,因为 MD5 不对应于消息,而是对应于 UTF-8 中消息的转换值作为有损字符串(见下文)。
这里是 link 另一个关于以字节数组格式正确计算 MD5 的问题:
How can I generate an MD5 hash?
这是因为在 Java 中将 bytearray 转换为 String 是有损的(与 C 相反),并且您会在此过程中丢失数据,因为某些字节与 Java 编码中的字符不对应(你的数据中显然有其中一些)。
所以你的 KafkaBolt 应该是
KafkaBolt<byte[], byte[]>
我不知道在kafka storm中随你的bytearray一起发送一个bytearray MD5是否足够。如果不是,则必须使用字节数组和 java 字符串之间无损的编码,例如 BASE64:
Base64 Encoding in Java
您必须使用
将字节数组转换为 base64 字符串
KafkaBolt<String, String>
然后照常发送数据
collector.emit(tuple, new Values(Obj.hashMD5(keyInBase64), keyInBase64));
这也意味着当您从 kafka 获取数据时,它将是一个 base64 中的字符串,您必须对其进行解码才能取回字节数组。
希望对您有所帮助。
我写了一个storm拓扑。我基本上想以字节数组的形式将 avro 模式中的元组发送到 kafka 主题。
我是这样设置螺栓的:
builder.setBolt(KAFKA_AVRO_BOLT_NAME, new KafkaBolt<String, byte[]>())
.fieldsGrouping(BOLT1, new Fields("key"));
这就是我转换为字节数组的方式
Schema schema = avroObject.getSchema();
DatumWriter<GenericRecord> writer = new GenericDatumWriter<GenericRecord>(schema);
ByteArrayOutputStream out = new ByteArrayOutputStream();
Encoder encoder = EncoderFactory.get().binaryEncoder(out, null);
writer.write(ping, encoder);
encoder.flush();
byte[] message = out.toByteArray();
String key = new String(message, "UTF-8");
当我以下列方式发出元组时,我在 kafka 主题中看不到任何内容(将字节流发送到 kafka):
collector.emit(tuple, new Values(Obj.hashMD5(key), message));
但相反,如果我将字节数组转换为字符串,然后再转换为 kafka 主题,它会起作用:
如下所示:
builder.setBolt(KAFKA_AVRO_BOLT_NAME, new KafkaBolt<String, String>())
.fieldsGrouping(BOLT1, new Fields("key"));
collector.emit(tuple, new Values(Obj.hashMD5(key), key));
我做错了什么?如何使用 storm kafka bolt 将字节流发送到 kafka 主题?
您遇到问题是因为您的 MD5 哈希值不正确:
你说如果你把你的 bytearray 转换成一个 java String 它就起作用了:这是因为根据一个 String,MD5 的值是正确的。
collector.emit(tuple, new Values(Obj.hashMD5(key), key));
正如您所见,MD5 是在一个字符串参数上计算的,您发送与 MD5 对应的字符串:一切都很好!
但是如果你发送一个bytearray,你需要计算一个bytearray的MD5,结果是一个bytearray,而不是一个String。您的代码:
collector.emit(tuple, new Values(Obj.hashMD5(key), message));
不正确,因为 MD5 不对应于消息,而是对应于 UTF-8 中消息的转换值作为有损字符串(见下文)。
这里是 link 另一个关于以字节数组格式正确计算 MD5 的问题:
How can I generate an MD5 hash?
这是因为在 Java 中将 bytearray 转换为 String 是有损的(与 C 相反),并且您会在此过程中丢失数据,因为某些字节与 Java 编码中的字符不对应(你的数据中显然有其中一些)。
所以你的 KafkaBolt 应该是
KafkaBolt<byte[], byte[]>
我不知道在kafka storm中随你的bytearray一起发送一个bytearray MD5是否足够。如果不是,则必须使用字节数组和 java 字符串之间无损的编码,例如 BASE64:
Base64 Encoding in Java
您必须使用
将字节数组转换为 base64 字符串KafkaBolt<String, String>
然后照常发送数据
collector.emit(tuple, new Values(Obj.hashMD5(keyInBase64), keyInBase64));
这也意味着当您从 kafka 获取数据时,它将是一个 base64 中的字符串,您必须对其进行解码才能取回字节数组。
希望对您有所帮助。