如何从 Apache Spark 中定期附加的日志文件中获取数据?
How to get data from regularly appended log file in Apache Spark?
我有一个 Apache 访问日志文件,其中有一些数据,而且还在不断增加。我想使用 Apache Spark Streaming API.
分析该数据
Spark 对我来说是新的,我创建了一个程序,在其中我使用 jssc.textFileStream(directory)
函数来获取日志数据。但它不符合我的要求。
请建议我使用 spark 分析该日志文件的一些方法。
这是我的代码。
SparkConf conf = new SparkConf()
.setMaster("spark://192.168.1.9:7077")
.setAppName("log streaming")
.setSparkHome("/usr/local/spark")
.setJars(new String[] { "target/sparkstreamingdemo-0.0.1.jar" });
StreamingContext ssc = new StreamingContext(conf, new Duration(5000));
DStream<String> filerdd = ssc.textFileStream("/home/user/logs");
filerdd.print();
ssc.start();
ssc.awaitTermination();
此代码不 return 现有文件中的任何数据。这仅在我创建新文件时有效,但当我更新该新文件时,程序再次不会 return 更新数据。
如果实时修改文件,您可以使用 Apache Commons IO 中的 Tailer。
这是最简单的示例:
public void readLogs(File f, long delay) {
TailerListener listener = new MyTailerListener();
Tailer tailer = new Tailer(f, listener, delay);
// stupid executor impl. for demo purposes
Executor executor = new Executor() {
public void execute(Runnable command) {
command.run();
}
};
executor.execute(tailer);
}
public class MyTailerListener extends TailerListenerAdapter {
public void handle(String line) {
System.out.println(line);
}
}
上面的代码可以用作日志 reader for Apache Flume and applied as a source. Then you need to configure Flume sink to redirect collected logs to Spark stream and apply Spark for analyzing data from Flume stream (http://spark.apache.org/docs/latest/streaming-flume-integration.html)
此 post 中有关 Flume 设置的更多详细信息:
我有一个 Apache 访问日志文件,其中有一些数据,而且还在不断增加。我想使用 Apache Spark Streaming API.
分析该数据Spark 对我来说是新的,我创建了一个程序,在其中我使用 jssc.textFileStream(directory)
函数来获取日志数据。但它不符合我的要求。
请建议我使用 spark 分析该日志文件的一些方法。
这是我的代码。
SparkConf conf = new SparkConf()
.setMaster("spark://192.168.1.9:7077")
.setAppName("log streaming")
.setSparkHome("/usr/local/spark")
.setJars(new String[] { "target/sparkstreamingdemo-0.0.1.jar" });
StreamingContext ssc = new StreamingContext(conf, new Duration(5000));
DStream<String> filerdd = ssc.textFileStream("/home/user/logs");
filerdd.print();
ssc.start();
ssc.awaitTermination();
此代码不 return 现有文件中的任何数据。这仅在我创建新文件时有效,但当我更新该新文件时,程序再次不会 return 更新数据。
如果实时修改文件,您可以使用 Apache Commons IO 中的 Tailer。 这是最简单的示例:
public void readLogs(File f, long delay) {
TailerListener listener = new MyTailerListener();
Tailer tailer = new Tailer(f, listener, delay);
// stupid executor impl. for demo purposes
Executor executor = new Executor() {
public void execute(Runnable command) {
command.run();
}
};
executor.execute(tailer);
}
public class MyTailerListener extends TailerListenerAdapter {
public void handle(String line) {
System.out.println(line);
}
}
上面的代码可以用作日志 reader for Apache Flume and applied as a source. Then you need to configure Flume sink to redirect collected logs to Spark stream and apply Spark for analyzing data from Flume stream (http://spark.apache.org/docs/latest/streaming-flume-integration.html)
此 post 中有关 Flume 设置的更多详细信息: