Storm-jms Spout 收集 Avro 消息并向下发送?
Storm-jms Spout collecting Avro messages and sending down stream?
我是 Avro 格式的新手。我正在尝试使用 Storm-Jms spout 从 JMS 队列收集 Avro 消息,并使用 hdfs bolt 将它们发送到 hdfs。
队列正在发送 avro,但我无法使用 HDFS BOLT 以 avro 格式获取它们。
如何正确收集 avro 消息并将它们发送到下游,而不会在 hdfs 中出现编码错误。
现有的 HDFS Bolt 不支持写入 avro 文件,我们需要通过进行以下更改来克服这个问题。在此示例代码中,我使用从我的 spout 获取 JMS 消息并将这些 JMS 字节消息转换为 AVRO 并将它们发送到 HDFS。
此代码可作为修改AbstractHdfsBolt中方法的示例
public void execute(Tuple tuple) {
try {
long length = bytesMessage.getBodyLength();
byte[] bytes = new byte[(int)length];
///////////////////////////////////////
bytesMessage.readBytes(bytes);
String replyMessage = new String(bytes, "UTF-8");
datumReader = new SpecificDatumReader<IndexedRecord>(schema);
decoder = DecoderFactory.get().binaryDecoder(bytes, null);
result = datumReader.read(null, decoder);
synchronized (this.writeLock) {
dataFileWriter.append(result);
dataFileWriter.sync();
this.offset += bytes.length;
if (this.syncPolicy.mark(tuple, this.offset)) {
if (this.out instanceof HdfsDataOutputStream) {
((HdfsDataOutputStream) this.out).hsync(EnumSet.of(SyncFlag.UPDATE_LENGTH));
} else {
this.out.hsync();
this.out.flush();
}
this.syncPolicy.reset();
}
dataFileWriter.flush();
}
if(this.rotationPolicy.mark(tuple, this.offset)){
rotateOutputFile(); // synchronized
this.offset = 0;
this.rotationPolicy.reset();
}
} catch (IOException | JMSException e) {
LOG.warn("write/sync failed.", e);
this.collector.fail(tuple);
}
}
@Override
void closeOutputFile() throws IOException {
this.out.close();
}
@Override
Path createOutputFile() throws IOException {
Path path = new Path(this.fileNameFormat.getPath(), this.fileNameFormat.getName(this.rotation, System.currentTimeMillis()));
this.out = this.fs.create(path);
dataFileWriter.create(schema, out);
return path;
}
@Override
void doPrepare(Map conf, TopologyContext topologyContext,OutputCollector collector) throws IOException {
// TODO Auto-generated method stub
LOG.info("Preparing HDFS Bolt...");
try {
schema = new Schema.Parser().parse(new File("/home/*******/********SchemafileName.avsc"));
} catch (IOException e1) {
e1.printStackTrace();
}
this.fs = FileSystem.get(URI.create(this.fsUrl), hdfsConfig);
datumWriter = new SpecificDatumWriter<IndexedRecord>(schema);
dataFileWriter = new DataFileWriter<IndexedRecord>(datumWriter);
JMSAvroUtils JASV = new JMSAvroUtils();
}
我是 Avro 格式的新手。我正在尝试使用 Storm-Jms spout 从 JMS 队列收集 Avro 消息,并使用 hdfs bolt 将它们发送到 hdfs。
队列正在发送 avro,但我无法使用 HDFS BOLT 以 avro 格式获取它们。
如何正确收集 avro 消息并将它们发送到下游,而不会在 hdfs 中出现编码错误。
现有的 HDFS Bolt 不支持写入 avro 文件,我们需要通过进行以下更改来克服这个问题。在此示例代码中,我使用从我的 spout 获取 JMS 消息并将这些 JMS 字节消息转换为 AVRO 并将它们发送到 HDFS。
此代码可作为修改AbstractHdfsBolt中方法的示例
public void execute(Tuple tuple) {
try {
long length = bytesMessage.getBodyLength();
byte[] bytes = new byte[(int)length];
///////////////////////////////////////
bytesMessage.readBytes(bytes);
String replyMessage = new String(bytes, "UTF-8");
datumReader = new SpecificDatumReader<IndexedRecord>(schema);
decoder = DecoderFactory.get().binaryDecoder(bytes, null);
result = datumReader.read(null, decoder);
synchronized (this.writeLock) {
dataFileWriter.append(result);
dataFileWriter.sync();
this.offset += bytes.length;
if (this.syncPolicy.mark(tuple, this.offset)) {
if (this.out instanceof HdfsDataOutputStream) {
((HdfsDataOutputStream) this.out).hsync(EnumSet.of(SyncFlag.UPDATE_LENGTH));
} else {
this.out.hsync();
this.out.flush();
}
this.syncPolicy.reset();
}
dataFileWriter.flush();
}
if(this.rotationPolicy.mark(tuple, this.offset)){
rotateOutputFile(); // synchronized
this.offset = 0;
this.rotationPolicy.reset();
}
} catch (IOException | JMSException e) {
LOG.warn("write/sync failed.", e);
this.collector.fail(tuple);
}
}
@Override
void closeOutputFile() throws IOException {
this.out.close();
}
@Override
Path createOutputFile() throws IOException {
Path path = new Path(this.fileNameFormat.getPath(), this.fileNameFormat.getName(this.rotation, System.currentTimeMillis()));
this.out = this.fs.create(path);
dataFileWriter.create(schema, out);
return path;
}
@Override
void doPrepare(Map conf, TopologyContext topologyContext,OutputCollector collector) throws IOException {
// TODO Auto-generated method stub
LOG.info("Preparing HDFS Bolt...");
try {
schema = new Schema.Parser().parse(new File("/home/*******/********SchemafileName.avsc"));
} catch (IOException e1) {
e1.printStackTrace();
}
this.fs = FileSystem.get(URI.create(this.fsUrl), hdfsConfig);
datumWriter = new SpecificDatumWriter<IndexedRecord>(schema);
dataFileWriter = new DataFileWriter<IndexedRecord>(datumWriter);
JMSAvroUtils JASV = new JMSAvroUtils();
}