Storm-jms Spout 收集 Avro 消息并向下发送?

Storm-jms Spout collecting Avro messages and sending down stream?

我是 Avro 格式的新手。我正在尝试使用 Storm-Jms spout 从 JMS 队列收集 Avro 消息,并使用 hdfs bolt 将它们发送到 hdfs。

队列正在发送 avro,但我无法使用 HDFS BOLT 以 avro 格式获取它们。

如何正确收集 avro 消息并将它们发送到下游,而不会在 hdfs 中出现编码错误。

现有的 HDFS Bolt 不支持写入 avro 文件,我们需要通过进行以下更改来克服这个问题。在此示例代码中,我使用从我的 spout 获取 JMS 消息并将这些 JMS 字节消息转换为 AVRO 并将它们发送到 HDFS。

此代码可作为修改AbstractHdfsBolt中方法的示例

public void execute(Tuple tuple) {          
        try {               
            long length = bytesMessage.getBodyLength();
            byte[] bytes = new byte[(int)length];
            ///////////////////////////////////////
            bytesMessage.readBytes(bytes);
            String replyMessage = new String(bytes, "UTF-8");

            datumReader = new SpecificDatumReader<IndexedRecord>(schema);
            decoder = DecoderFactory.get().binaryDecoder(bytes, null);

            result = datumReader.read(null, decoder);                               
            synchronized (this.writeLock) {                 
                dataFileWriter.append(result);                                      
                dataFileWriter.sync();
                this.offset += bytes.length;                    
               if (this.syncPolicy.mark(tuple, this.offset)) {
                   if (this.out instanceof HdfsDataOutputStream) {
                        ((HdfsDataOutputStream) this.out).hsync(EnumSet.of(SyncFlag.UPDATE_LENGTH));
                    } else {
                        this.out.hsync();
                        this.out.flush();
                    }
                    this.syncPolicy.reset();
                }
               dataFileWriter.flush();
            }

            if(this.rotationPolicy.mark(tuple, this.offset)){
                rotateOutputFile(); // synchronized
                this.offset = 0;
                this.rotationPolicy.reset();
            }
        } catch (IOException | JMSException e) {
            LOG.warn("write/sync failed.", e);
            this.collector.fail(tuple);
        } 
    }

@Override
void closeOutputFile() throws IOException {
    this.out.close();
}

@Override
Path createOutputFile() throws IOException {
    Path path = new Path(this.fileNameFormat.getPath(), this.fileNameFormat.getName(this.rotation, System.currentTimeMillis()));
    this.out = this.fs.create(path);
    dataFileWriter.create(schema, out);
    return path;
}

@Override
void doPrepare(Map conf, TopologyContext topologyContext,OutputCollector collector) throws IOException {
    // TODO Auto-generated method stub
     LOG.info("Preparing HDFS Bolt...");
     try {

            schema = new Schema.Parser().parse(new File("/home/*******/********SchemafileName.avsc"));
        } catch (IOException e1) {              
            e1.printStackTrace();
        }
     this.fs = FileSystem.get(URI.create(this.fsUrl), hdfsConfig);
     datumWriter = new SpecificDatumWriter<IndexedRecord>(schema);
     dataFileWriter = new DataFileWriter<IndexedRecord>(datumWriter);
     JMSAvroUtils JASV = new JMSAvroUtils();         
}