如何为每个批次重置 Flume 的自定义接收器 class 中的变量

How to reset a variable in Flume's custom sink class for every batch

我有一个 flume 进程,它从 spooldir 上的文件中读取数据并将数据加载到 MySQL 数据库中。将有多种类型的文件可以由同一个 flume 进程处理。

我创建了一个自定义接收器 java class(扩展 AbstractSink),它在 initial/first 读取后更新局部变量 (sInterfaceType) 以确定文件中的数据格式. 一旦文件处理完成,我必须重置它,以便它必须从识别下一个 batch/interface 文件开始。

我尝试在 stop() 中执行,但没有帮助。有人这样做过吗?

我的水槽 class 看起来像这样:

public class MyFlumeSink2 extends AbstractSink implements Configurable {

 private String sInterfaceType; //tells file format of current load

 public MyFlumeSink2() {
  //my initialization of variables
 }

 public void configure(Context context) {
  //read context variables
 }

 public void start() {
  //create db connection
 }

 @Override
 public void stop() {
  //destroy connection
  sInterfaceType = ""; //This doesn't help me
  super.stop();
 }

 public Status process() throws EventDeliveryException {
   Channel channel = getChannel();
   Transaction transaction = channel.getTransaction();

   if((sInterfaceType=="" || sInterfaceType==null))
   {
    //Read first line & set sInterfaceType
   }else
     //Insert data in MySQL

   transaction.commit();
 }
}

我们必须手动决定它是哪个事件,没有为每个新文件调用专门的方法。

我修改了我的代码以读取事件行并根据第一个元素设置 InterfaceType。我的代码如下所示:

public Status process() throws EventDeliveryException {
        //....other code...

            sEvtBody = new String(event.getBody());
            sFields = sEvtBody.split(",");

            //check first field to know record type
            enumRec = RecordType.valueOf( checkRecordType(sFields[0].toUpperCase()) );
            switch(enumRec)
            {
                case CUST_ID:
                    sInterfaceType = "T_CUST";
                    bHeader = true;
                    break;
                case TXN_ID:
                    sInterfaceType = "T_CUST_TXNS";
                    bHeader = true;
                    break;
                default:
                    bHeader = false;
            }
            //insert if not header
            if(!bHeader)
            {

                if(sInterfaceType == "T_CUST")
                {
                    if(sFields.length == 14)
                        this.bInsertStatus = daoClass.insertHeader(sFields);
                    else
                        throw new Exception("INCORRECT_COLUMN_COUNT");
                }else if(sInterfaceType == "T_CUST_TXNS")
                {
                    if(sFields.length == 10)
                        this.bInsertStatus = daoClass.insertData(sFields);
                    else
                        throw new Exception("INCORRECT_COLUMN_COUNT");
                }

                //if(!bInsertStatus)
                //  logTransaction(sFields);
            }
            //....Other code....