Nifi 自定义处理器显示错误 "the local variable flowfile cannot be assigned"

Nifi Custom processor showing error "the local variable flowfile cannot be assigned"

我正在尝试将原始流文件作为输入发送到下一个处理器,但最终出现错误,我是 Nifi 的新手并且在 Java 方面也有一些经验。

public void onTrigger(ProessorContext context, ProcessSessionsession) throws ProceException{
   FlowFile flowfile = session.get();
   if(flowfile == null){
        return;
    }

    ArrayList<String> headData = new ArrayList<String>();
    try{
        session.read(flowfile, new InputStreamCallback(){

        final DBCPService= context.getProperty(CONNECTION_POOL).asContollerService(DBCPService.class);
        String query = " CREATE TABLE MODEL (";
        @SuppressWarnings("deprecation")
        public void process(InputStream inputStream) throws IOException {
        try{
            OPCPackage pkg = OPCPackage.open(inputStream);
            XSSFWorkbook workbook = new XSSFWorkbook(pkg);
            workbook.getAllNames();
            String dateheader = "date"
            XSSFSheet sheetName = workbook.getSheet(0);
            Row row = sheetName.getRow(0);
            for(Cell cell: row) {
                switch (cell.getCellType()){
                case NUMERIC:
                    if(HSSDataUtil.isCellDateFromated(cell)){
                        DataFormatter dataFromatter = new DataFormatter();
                        headData.add(dataFromatter.formatCellValue(cell);
                        query +=dataFromatter.formatCellValue(cell)+" " + "INT" ;
                    }else{
                        headData.add(String.valueOf(cell.getNumericCellValue()));
                    }
                    break;
                    case STRING:
                        headData.add(cell.getStringCellValue());
                        if(cell.getStringCellValue().toLowerCase().contains(dateheader))
                            query += cell.getStringCellValue() + " " + "TIMESTAMP,";
                        else
                            query +=cell.getStringCellValue() + " + "VARCHAR(50),";
                            break;
                    case BOOLEAN:
                        headData.add(String.valueOf(cell.getBooleanCellValue());
                        break;
                    default:
                        headData.add("");
                        break;
                        }
                    }
                    query = query.substring(0, query.length() -1);
                    query += ")";
                    workbook.close();
                    final Connection con = DBCPService.getConnection();
                    try{
                        java.sql.PreparedStatement = con.prepareStatement(query);
                        PreparedStatement.execute();
                        con.commit();
                        session.transfer(flowfile, REL_SUCCESS);
                    }catch (SQL Exception e){
                        e.printStackTrace();
                        session.transfer(flowfile, REL_FAILURE);
                    }
                }catch(InvalidFromatException ife){
                    getLogger().error(" only .xlsx excel files are supprted", ife);
                    thrownew UnsupportedOperationException("Only .xlsx OOXML files are substring", ife);
                }
            }

        });
        {catch (RuntimeException ex) {
            getLogger().error("Failed to process incoming Excel document. " + ex.getMessage(), ex);
            FlowFile failedFlowFile = session.putAttribute(flowfile, testxlsqlProcessor.class.getMessage());
        }
        final StringBuilder stringBuilder = new StringBuilder();
        flowfile = session.write(flowfile, new StreamCallback(){
        public void process(InputStream in, OutputStream out) throws IOException{
            stringBuilder.append(IOUtils.copy(in,out));
        }
    });

    }
}

如果我不添加输出流,我会得到异常传输关系未指定。

您不希望从 InputStreamCallback 内部传输流文件,这应该在您完成从流文件读取后发生。如果您不更改传出流文件的内容,那么您也不需要 StreamCallback 和 IOUtils.copy() 最后的东西,您可以只传输原始流文件。对于失败的情况,你可以在InputStreamCallback中抛出一个IOException包裹真正的异常,在外面捕获它,然后将原始流文件传输到失败。如果没有异常,就可以将原流程文件转成成功。