如何使用storm将数据持久化到HDFS
How to persist data into HDFS using storm
我有一个简单的 bolt,它从 kafka spout 读取数据,然后将数据写入 HDFS 目录。问题是在集群停止之前,bolt 不会写入。我如何确保 bolt 从 kafka spout 读取一个元组,然后立即将其写入 HDFS,或者至少写入每个 'n' 条目。 (我使用的是 CDH 4.4,Hadoop 2.0)
螺栓的java:
public class PrinterBolt10 extends BaseRichBolt{
private OutputCollector collector;
private String values;
Configuration configuration = null;
FileSystem hdfs = null;
FSDataOutputStream outputStream=null;
BufferedWriter br = null;
List<String> valList;
String machineValue;
int upTime;
int downTime;
int idleTime;
public void prepare(Map config, TopologyContext context,OutputCollector collector) {
upTime=0;
downTime=0;
idleTime=0;
this.collector = collector;
String timeStamp = new SimpleDateFormat("yyyyMMdd_HHmmss").format(Calendar.getInstance().getTime());
try{
configuration = new Configuration();
configuration.set("fs.defaultFS", "hdfs://localhost.localdomain:8020");
hdfs =FileSystem.get(configuration);
outputStream = hdfs.create(new Path("/tmp/storm/StormHdfs/machine10_"+timeStamp+".txt"));
br = new BufferedWriter( new OutputStreamWriter( outputStream , "UTF-8" ) );
br.flush();
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
public void execute(Tuple tuple) {
values = tuple.toString();
int start = values.indexOf('[');
int end = values.indexOf(']');
machineValue=values.substring(start+1,end);
String machine=machineValue.substring(0,machineValue.indexOf(','));
String code = machineValue.substring(machineValue.indexOf(',')+1);
int codeInt = Integer.parseInt(code);
if(codeInt==0) idleTime+=30;
elseif(codeInt==1) upTime+=30;
else downTime+=30;
String finalMessage = machine + " "+ "upTime(s) :" + upTime+" "+ "idleTime(s): "+idleTime+" "+"downTime: "+downTime;
try {
br.write(finalMessage); // *This is the writing part into HDFS*
br.write('\n');
br.flush();
} catch (IOException e) {
e.printStackTrace();
}
}
public void declareOutputFields(OutputFieldsDeclarer declarer) {
// this bolt does not emit anything
}
public void cleanup() {}
}
编辑:完全改变了我的答案。
您需要使用HdfsBolt
而不是依靠自己编写文件。使用 HdfsBolt
消除了计算何时刷新到文件、打开缓冲流等的所有复杂问题。参见 http://docs.hortonworks.com/HDPDocuments/HDP2/HDP-2.1.3/bk_user-guide/content/ch_storm-using-hdfs-connector.html,但您感兴趣的位是:
// Use pipe as record boundary
RecordFormat format = new DelimitedRecordFormat().withFieldDelimiter("|");
//Synchronize data buffer with the filesystem every 1000 tuples
SyncPolicy syncPolicy = new CountSyncPolicy(1000);
// Rotate data files when they reach five MB
FileRotationPolicy rotationPolicy = new FileSizeRotationPolicy(5.0f, Units.MB);
// Use default, Storm-generated file names
FileNameFormat fileNameFormat = new DefaultFileNameFormat().withPath("/foo");
// Instantiate the HdfsBolt
HdfsBolt bolt = new HdfsBolt()
.withFsURL("hdfs://localhost:54310")
.withFileNameFormat(fileNameFormat)
.withRecordFormat(format)
.withRotationPolicy(rotationPolicy)
.withSyncPolicy(syncPolicy);
然后只需将当前螺栓中的数据传递到这个螺栓中即可。
您应该使用 HdfsBolt 将数据插入 HDFS。使用作者描述的配置。出于测试目的,您不应将 SyncPolicy 计数设为 1000,而应将其设为某个最小值(例如 10-20)。因为该数字表示在 spout 发出了多少个元组之后,这些元组应该写入 HDFS。例如,如果您配置
SyncPolicy syncPolicy = new CountSyncPolicy(10);
那么你将能够在10条消息后看到你插入到Kafka中的数据。
我有一个简单的 bolt,它从 kafka spout 读取数据,然后将数据写入 HDFS 目录。问题是在集群停止之前,bolt 不会写入。我如何确保 bolt 从 kafka spout 读取一个元组,然后立即将其写入 HDFS,或者至少写入每个 'n' 条目。 (我使用的是 CDH 4.4,Hadoop 2.0)
螺栓的java:
public class PrinterBolt10 extends BaseRichBolt{
private OutputCollector collector;
private String values;
Configuration configuration = null;
FileSystem hdfs = null;
FSDataOutputStream outputStream=null;
BufferedWriter br = null;
List<String> valList;
String machineValue;
int upTime;
int downTime;
int idleTime;
public void prepare(Map config, TopologyContext context,OutputCollector collector) {
upTime=0;
downTime=0;
idleTime=0;
this.collector = collector;
String timeStamp = new SimpleDateFormat("yyyyMMdd_HHmmss").format(Calendar.getInstance().getTime());
try{
configuration = new Configuration();
configuration.set("fs.defaultFS", "hdfs://localhost.localdomain:8020");
hdfs =FileSystem.get(configuration);
outputStream = hdfs.create(new Path("/tmp/storm/StormHdfs/machine10_"+timeStamp+".txt"));
br = new BufferedWriter( new OutputStreamWriter( outputStream , "UTF-8" ) );
br.flush();
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
public void execute(Tuple tuple) {
values = tuple.toString();
int start = values.indexOf('[');
int end = values.indexOf(']');
machineValue=values.substring(start+1,end);
String machine=machineValue.substring(0,machineValue.indexOf(','));
String code = machineValue.substring(machineValue.indexOf(',')+1);
int codeInt = Integer.parseInt(code);
if(codeInt==0) idleTime+=30;
elseif(codeInt==1) upTime+=30;
else downTime+=30;
String finalMessage = machine + " "+ "upTime(s) :" + upTime+" "+ "idleTime(s): "+idleTime+" "+"downTime: "+downTime;
try {
br.write(finalMessage); // *This is the writing part into HDFS*
br.write('\n');
br.flush();
} catch (IOException e) {
e.printStackTrace();
}
}
public void declareOutputFields(OutputFieldsDeclarer declarer) {
// this bolt does not emit anything
}
public void cleanup() {}
}
编辑:完全改变了我的答案。
您需要使用HdfsBolt
而不是依靠自己编写文件。使用 HdfsBolt
消除了计算何时刷新到文件、打开缓冲流等的所有复杂问题。参见 http://docs.hortonworks.com/HDPDocuments/HDP2/HDP-2.1.3/bk_user-guide/content/ch_storm-using-hdfs-connector.html,但您感兴趣的位是:
// Use pipe as record boundary
RecordFormat format = new DelimitedRecordFormat().withFieldDelimiter("|");
//Synchronize data buffer with the filesystem every 1000 tuples
SyncPolicy syncPolicy = new CountSyncPolicy(1000);
// Rotate data files when they reach five MB
FileRotationPolicy rotationPolicy = new FileSizeRotationPolicy(5.0f, Units.MB);
// Use default, Storm-generated file names
FileNameFormat fileNameFormat = new DefaultFileNameFormat().withPath("/foo");
// Instantiate the HdfsBolt
HdfsBolt bolt = new HdfsBolt()
.withFsURL("hdfs://localhost:54310")
.withFileNameFormat(fileNameFormat)
.withRecordFormat(format)
.withRotationPolicy(rotationPolicy)
.withSyncPolicy(syncPolicy);
然后只需将当前螺栓中的数据传递到这个螺栓中即可。
您应该使用 HdfsBolt 将数据插入 HDFS。使用作者描述的配置。出于测试目的,您不应将 SyncPolicy 计数设为 1000,而应将其设为某个最小值(例如 10-20)。因为该数字表示在 spout 发出了多少个元组之后,这些元组应该写入 HDFS。例如,如果您配置
SyncPolicy syncPolicy = new CountSyncPolicy(10);
那么你将能够在10条消息后看到你插入到Kafka中的数据。