如何使用可以跨越多行的 Spark 解析日志行
How to parse log lines using Spark that could span multiple lines
我正在开发一个可以读取和解析自定义日志文件的 Spark/Scala 应用程序。我在解析多行日志条目时遇到问题。这是我的代码片段:
case class MLog(dateTime: String, classification: String, serverType: String, identification:String, operation: String)
val PATTERN = """(?s)(\d{4}-\d{2}-\d{2}\s\d{2}:\d{2}:\d{2},\d{3})\s+(\w+)s+\[(.*)\]\s+\[(.*)\]\s+(.*)"""
def parseLogLine(log: String): MLog={
val res = PATTERN.findFirstMatchIn(log)
if (res.isEmpty) {
throw new RuntimeException("Cannot parse log line: " + log)
MLog(m.group(1),m.group(2),m.group(3),m.group(4),m.group(5))
}
sc.textFile("/mydirectory/logfile").map(parseLogLine).foreach(println)
日志文件中的一些条目跨越多行。正则表达式适用于单行条目,但当读取多行条目时如下所示,
2015-08-31 00:10:17,682 WARN [ScheduledTask-10] [name=custname;mid=9999;ds=anyvalue;] datasource - Scheduled DataSource import failed.
com.xxx.common.service.ServiceException: system failure: Unable to connect to ANY server: LdapDataSource{id=xxx, type=xxx, enabled=true, name=xxx, host=xxx port=999, connectionType=ssl, username=xxx, folderId=99999}
我收到此错误:
Cannot parse log line:com.xxx.common.service.ServiceException: system failure: Unable to connect to ANY server: LdapDataSource{id=xxx, type=xxx, enabled=true, name=xxx, host=xxx port=999, connectionType=ssl, username=xxx, folderId=99999}
如何让 Spark 从日志文件中读取多行日志条目?
由于输入文件很小,您可以使用 SparkContext.wholeTextFiles
。
// Parse a single file and return all extracted entries
def parseLogFile(log: String): Iterator[MLog] = {
val p: scala.util.matching.Regex = ???
p.findAllMatchIn(log).map(
m => MLog(m.group(1), m.group(2), m.group(3), m.group(4), m.group(5))
)
}
val rdd: RDD[MLog] = sc
.wholeTextFiles("/path/to/input/dir")
.flatMap{case (_, txt) => parseLogFile(txt)}
我正在开发一个可以读取和解析自定义日志文件的 Spark/Scala 应用程序。我在解析多行日志条目时遇到问题。这是我的代码片段:
case class MLog(dateTime: String, classification: String, serverType: String, identification:String, operation: String)
val PATTERN = """(?s)(\d{4}-\d{2}-\d{2}\s\d{2}:\d{2}:\d{2},\d{3})\s+(\w+)s+\[(.*)\]\s+\[(.*)\]\s+(.*)"""
def parseLogLine(log: String): MLog={
val res = PATTERN.findFirstMatchIn(log)
if (res.isEmpty) {
throw new RuntimeException("Cannot parse log line: " + log)
MLog(m.group(1),m.group(2),m.group(3),m.group(4),m.group(5))
}
sc.textFile("/mydirectory/logfile").map(parseLogLine).foreach(println)
日志文件中的一些条目跨越多行。正则表达式适用于单行条目,但当读取多行条目时如下所示,
2015-08-31 00:10:17,682 WARN [ScheduledTask-10] [name=custname;mid=9999;ds=anyvalue;] datasource - Scheduled DataSource import failed.
com.xxx.common.service.ServiceException: system failure: Unable to connect to ANY server: LdapDataSource{id=xxx, type=xxx, enabled=true, name=xxx, host=xxx port=999, connectionType=ssl, username=xxx, folderId=99999}
我收到此错误:
Cannot parse log line:com.xxx.common.service.ServiceException: system failure: Unable to connect to ANY server: LdapDataSource{id=xxx, type=xxx, enabled=true, name=xxx, host=xxx port=999, connectionType=ssl, username=xxx, folderId=99999}
如何让 Spark 从日志文件中读取多行日志条目?
由于输入文件很小,您可以使用 SparkContext.wholeTextFiles
。
// Parse a single file and return all extracted entries
def parseLogFile(log: String): Iterator[MLog] = {
val p: scala.util.matching.Regex = ???
p.findAllMatchIn(log).map(
m => MLog(m.group(1), m.group(2), m.group(3), m.group(4), m.group(5))
)
}
val rdd: RDD[MLog] = sc
.wholeTextFiles("/path/to/input/dir")
.flatMap{case (_, txt) => parseLogFile(txt)}