如何为每个批次重置 Flume 的自定义接收器 class 中的变量
How to reset a variable in Flume's custom sink class for every batch
我有一个 flume 进程,它从 spooldir 上的文件中读取数据并将数据加载到 MySQL 数据库中。将有多种类型的文件可以由同一个 flume 进程处理。
我创建了一个自定义接收器 java class(扩展 AbstractSink),它在 initial/first 读取后更新局部变量 (sInterfaceType) 以确定文件中的数据格式.
一旦文件处理完成,我必须重置它,以便它必须从识别下一个 batch/interface 文件开始。
我尝试在 stop() 中执行,但没有帮助。有人这样做过吗?
我的水槽 class 看起来像这样:
public class MyFlumeSink2 extends AbstractSink implements Configurable {
private String sInterfaceType; //tells file format of current load
public MyFlumeSink2() {
//my initialization of variables
}
public void configure(Context context) {
//read context variables
}
public void start() {
//create db connection
}
@Override
public void stop() {
//destroy connection
sInterfaceType = ""; //This doesn't help me
super.stop();
}
public Status process() throws EventDeliveryException {
Channel channel = getChannel();
Transaction transaction = channel.getTransaction();
if((sInterfaceType=="" || sInterfaceType==null))
{
//Read first line & set sInterfaceType
}else
//Insert data in MySQL
transaction.commit();
}
}
我们必须手动决定它是哪个事件,没有为每个新文件调用专门的方法。
我修改了我的代码以读取事件行并根据第一个元素设置 InterfaceType。我的代码如下所示:
public Status process() throws EventDeliveryException {
//....other code...
sEvtBody = new String(event.getBody());
sFields = sEvtBody.split(",");
//check first field to know record type
enumRec = RecordType.valueOf( checkRecordType(sFields[0].toUpperCase()) );
switch(enumRec)
{
case CUST_ID:
sInterfaceType = "T_CUST";
bHeader = true;
break;
case TXN_ID:
sInterfaceType = "T_CUST_TXNS";
bHeader = true;
break;
default:
bHeader = false;
}
//insert if not header
if(!bHeader)
{
if(sInterfaceType == "T_CUST")
{
if(sFields.length == 14)
this.bInsertStatus = daoClass.insertHeader(sFields);
else
throw new Exception("INCORRECT_COLUMN_COUNT");
}else if(sInterfaceType == "T_CUST_TXNS")
{
if(sFields.length == 10)
this.bInsertStatus = daoClass.insertData(sFields);
else
throw new Exception("INCORRECT_COLUMN_COUNT");
}
//if(!bInsertStatus)
// logTransaction(sFields);
}
//....Other code....
我有一个 flume 进程,它从 spooldir 上的文件中读取数据并将数据加载到 MySQL 数据库中。将有多种类型的文件可以由同一个 flume 进程处理。
我创建了一个自定义接收器 java class(扩展 AbstractSink),它在 initial/first 读取后更新局部变量 (sInterfaceType) 以确定文件中的数据格式. 一旦文件处理完成,我必须重置它,以便它必须从识别下一个 batch/interface 文件开始。
我尝试在 stop() 中执行,但没有帮助。有人这样做过吗?
我的水槽 class 看起来像这样:
public class MyFlumeSink2 extends AbstractSink implements Configurable {
private String sInterfaceType; //tells file format of current load
public MyFlumeSink2() {
//my initialization of variables
}
public void configure(Context context) {
//read context variables
}
public void start() {
//create db connection
}
@Override
public void stop() {
//destroy connection
sInterfaceType = ""; //This doesn't help me
super.stop();
}
public Status process() throws EventDeliveryException {
Channel channel = getChannel();
Transaction transaction = channel.getTransaction();
if((sInterfaceType=="" || sInterfaceType==null))
{
//Read first line & set sInterfaceType
}else
//Insert data in MySQL
transaction.commit();
}
}
我们必须手动决定它是哪个事件,没有为每个新文件调用专门的方法。
我修改了我的代码以读取事件行并根据第一个元素设置 InterfaceType。我的代码如下所示:
public Status process() throws EventDeliveryException {
//....other code...
sEvtBody = new String(event.getBody());
sFields = sEvtBody.split(",");
//check first field to know record type
enumRec = RecordType.valueOf( checkRecordType(sFields[0].toUpperCase()) );
switch(enumRec)
{
case CUST_ID:
sInterfaceType = "T_CUST";
bHeader = true;
break;
case TXN_ID:
sInterfaceType = "T_CUST_TXNS";
bHeader = true;
break;
default:
bHeader = false;
}
//insert if not header
if(!bHeader)
{
if(sInterfaceType == "T_CUST")
{
if(sFields.length == 14)
this.bInsertStatus = daoClass.insertHeader(sFields);
else
throw new Exception("INCORRECT_COLUMN_COUNT");
}else if(sInterfaceType == "T_CUST_TXNS")
{
if(sFields.length == 10)
this.bInsertStatus = daoClass.insertData(sFields);
else
throw new Exception("INCORRECT_COLUMN_COUNT");
}
//if(!bInsertStatus)
// logTransaction(sFields);
}
//....Other code....