如何读取 Spark 中的多行元素,其中日志的每条记录以 yyyy-MM-dd 格式开头并且日志的每条记录都是多行的?
How to read multiple line elements in Spark , where each record of log is starting with yyyy-MM-dd format and each record of log is multi-line?
到目前为止,我已经在 scala 中实现了以下逻辑:
val hadoopConf = new Configuration(sc.hadoopConfiguration);
//hadoopConf.set("textinputformat.record.delimiter", "2016-")
hadoopConf.set("textinputformat.record.delimiter", "^([0-9]{4}.*)")
val accessLogs = sc.newAPIHadoopFile("/user/root/sample.log", classOf[TextInputFormat], classOf[LongWritable], classOf[Text], hadoopConf).map(x=>x._2.toString)
我想让正则表达式识别以日期格式开头的行,然后将其视为新记录,否则继续在旧记录中添加行。
但这不起作用。如果我手动传递日期,那么它工作正常。下面是我想放置正则表达式的相同代码:
//hadoopConf.set("textinputformat.record.delimiter", "2016-")
请提前帮助this.thanks。
下面是示例格式:
2016-12-23 07:00:09,693 [jetty-51 - /app/service] INFO org.apache.cxf.interceptor.LoggingOutInterceptor S:METHOD_NAME=METHNAME : WebAppSessionId= : ChannelSessionId=web-xxx-xxx-xxx : ClientIp=xxxxxxx : - Outbound Message
---------------------------
ID: 1978
Address: https://sample.domain.com/SampleService.xxx/basic
Encoding: UTF-8
Content-Type: text/xml
Headers: {Accept=[*/*], SOAPAction=["WebDomain.Service/app"]}
Payload: <soap:Envelope>
</soap:Envelope>
2016-12-26 08:00:01,514 [jetty-1195 - /app/service/serviceName] ERROR com.testservices.cache.impl.ActiveSpaceCacheHandler S:METHOD_NAME=ServiceInquiryWithBands : WebAppSessionId= : ChannelSessionId=SERVICE : ClientIp=client-ip : - ActiveSpaceCacheHandler:getServiceResponseFromCache(); exception: java.lang.Exception: getServiceResponseData: com.tibco.as.space.RuntimeASException: field key is not nullable and is missing in tuple for cachekey:Request.US
2016-12-26 08:00:01,624 [jetty-979 - /app/service/serviceName] ERROR com.testservices.cache.impl.ActiveSpaceCacheHandler S:METHOD_NAME=ServiceInquiryWithBands : WebAppSessionId= : ChannelSessionId=SERVICE : ClientIp=client-ip : - ActiveSpaceCacheHandler:setServiceResponseInCache(); exception: com.test.as.space.RuntimeASException: field key is not nullable and is missing in tuple for cachekey
我无法让它与正则表达式一起工作。我能做的最好的是 hadoopConf.set("textinputformat.record.delimiter", "\n20")
如果您在日志条目中间没有这些字符,它可能对您有用。这种方法还将为您提供面向未来的支持日期,最长可达 2099
如果您需要正则表达式,可以尝试 http://dronamk.blogspot.co.uk/2013/03/regex-custom-input-format-for-hadoop.html
我的代码:
// Create some dummy data
val s = """2016-12-23 07:00:09,693 [jetty-51 - /app/service] INFO org.apache.cxf.interceptor.LoggingOutInterceptor S:METHOD_NAME=METHNAME : WebAppSessionId= : ChannelSessionId=web-xxx-xxx-xxx : ClientIp=xxxxxxx : - Outbound Message
|---------------------------
| ID: 1978
| Address: https://sample.domain.com/SampleService.xxx/basic
| Encoding: UTF-8
| Content-Type: text/xml
| Headers: {Accept=[*/*], SOAPAction=["WebDomain.Service/app"]}
| Payload: <soap:Envelope>
| </soap:Envelope>
|2016-12-26 08:00:01,514 [jetty-1195 - /app/service/serviceName] ERROR com.testservices.cache.impl.ActiveSpaceCacheHandler S:METHOD_NAME=ServiceInquiryWithBands : WebAppSessionId= : ChannelSessionId=SERVICE : ClientIp=client-ip : - ActiveSpaceCacheHandler:getServiceResponseFromCache(); exception: java.lang.Exception: getServiceResponseData: com.tibco.as.space.RuntimeASException: field key is not nullable and is missing in tuple for cachekey:Request.US
|2016-12-26 08:00:01,624 [jetty-979 - /app/service/serviceName] ERROR com.testservices.cache.impl.ActiveSpaceCacheHandler S:METHOD_NAME=ServiceInquiryWithBands : WebAppSessionId= : ChannelSessionId=SERVICE : ClientIp=client-ip : - ActiveSpaceCacheHandler:setServiceResponseInCache(); exception: com.test.as.space.RuntimeASException: field key is not nullable and is missing in tuple for cachekey
""".stripMargin
import java.io._
val pw = new PrintWriter(new File("log.txt"))
pw.write(s)
pw.close
// Now process the data
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat
import org.apache.hadoop.io.Text
import org.apache.hadoop.io.LongWritable
import org.apache.spark.{SparkContext, SparkConf}
val conf = sc.getConf
sc.stop()
conf.registerKryoClasses(Array(classOf[org.apache.hadoop.io.LongWritable]))
val sc = new SparkContext(conf)
val hadoopConf = new Configuration(sc.hadoopConfiguration)
hadoopConf.set("textinputformat.record.delimiter", "\n20")
val accessLogs = sc.newAPIHadoopFile("log.txt", classOf[TextInputFormat], classOf[LongWritable], classOf[Text], hadoopConf)
accessLogs.map(x => x._2.toString).zipWithIndex().collect().foreach(println)
请注意,我使用 zipWithIndex 只是为了调试目的。输出为:
(2016-12-23 07:00:09,693 [jetty-51 - /app/service] INFO org.apache.cxf.interceptor.LoggingOutInterceptor S:METHOD_NAME=METHNAME : WebAppSessionId= : ChannelSessionId=web-xxx-xxx-xxx : ClientIp=xxxxxxx : - Outbound Message
---------------------------
ID: 1978
Address: https://sample.domain.com/SampleService.xxx/basic
Encoding: UTF-8
Content-Type: text/xml
Headers: {Accept=[*/*], SOAPAction=["WebDomain.Service/app"]}
Payload:
,0)
(16-12-26 08:00:01,514 [jetty-1195 - /app/service/serviceName] ERROR com.testservices.cache.impl.ActiveSpaceCacheHandler S:METHOD_NAME=ServiceInquiryWithBands : WebAppSessionId= : ChannelSessionId=SERVICE : ClientIp=client-ip : - ActiveSpaceCacheHandler:getServiceResponseFromCache(); exception: java.lang.Exception: getServiceResponseData: com.tibco.as.space.RuntimeASException: field key is not nullable and is missing in tuple for cachekey:Request.US,1)
(16-12-26 08:00:01,624 [jetty-979 - /app/service/serviceName] ERROR com.testservices.cache.impl.ActiveSpaceCacheHandler S:METHOD_NAME=ServiceInquiryWithBands : WebAppSessionId= : ChannelSessionId=SERVICE : ClientIp=client-ip : - ActiveSpaceCacheHandler:setServiceResponseInCache(); exception: com.test.as.space.RuntimeASException: field key is not nullable and is missing in tuple for cachekey
,2)
注意索引是输出中的第二个字段。
我 运行 此代码在 IBM Datascience Exerience 笔记本上 运行 Scala 2.10 和 Spark 1.6
到目前为止,我已经在 scala 中实现了以下逻辑:
val hadoopConf = new Configuration(sc.hadoopConfiguration);
//hadoopConf.set("textinputformat.record.delimiter", "2016-")
hadoopConf.set("textinputformat.record.delimiter", "^([0-9]{4}.*)")
val accessLogs = sc.newAPIHadoopFile("/user/root/sample.log", classOf[TextInputFormat], classOf[LongWritable], classOf[Text], hadoopConf).map(x=>x._2.toString)
我想让正则表达式识别以日期格式开头的行,然后将其视为新记录,否则继续在旧记录中添加行。
但这不起作用。如果我手动传递日期,那么它工作正常。下面是我想放置正则表达式的相同代码:
//hadoopConf.set("textinputformat.record.delimiter", "2016-")
请提前帮助this.thanks。
下面是示例格式:
2016-12-23 07:00:09,693 [jetty-51 - /app/service] INFO org.apache.cxf.interceptor.LoggingOutInterceptor S:METHOD_NAME=METHNAME : WebAppSessionId= : ChannelSessionId=web-xxx-xxx-xxx : ClientIp=xxxxxxx : - Outbound Message
---------------------------
ID: 1978
Address: https://sample.domain.com/SampleService.xxx/basic
Encoding: UTF-8
Content-Type: text/xml
Headers: {Accept=[*/*], SOAPAction=["WebDomain.Service/app"]}
Payload: <soap:Envelope>
</soap:Envelope>
2016-12-26 08:00:01,514 [jetty-1195 - /app/service/serviceName] ERROR com.testservices.cache.impl.ActiveSpaceCacheHandler S:METHOD_NAME=ServiceInquiryWithBands : WebAppSessionId= : ChannelSessionId=SERVICE : ClientIp=client-ip : - ActiveSpaceCacheHandler:getServiceResponseFromCache(); exception: java.lang.Exception: getServiceResponseData: com.tibco.as.space.RuntimeASException: field key is not nullable and is missing in tuple for cachekey:Request.US
2016-12-26 08:00:01,624 [jetty-979 - /app/service/serviceName] ERROR com.testservices.cache.impl.ActiveSpaceCacheHandler S:METHOD_NAME=ServiceInquiryWithBands : WebAppSessionId= : ChannelSessionId=SERVICE : ClientIp=client-ip : - ActiveSpaceCacheHandler:setServiceResponseInCache(); exception: com.test.as.space.RuntimeASException: field key is not nullable and is missing in tuple for cachekey
我无法让它与正则表达式一起工作。我能做的最好的是 hadoopConf.set("textinputformat.record.delimiter", "\n20")
如果您在日志条目中间没有这些字符,它可能对您有用。这种方法还将为您提供面向未来的支持日期,最长可达 2099
如果您需要正则表达式,可以尝试 http://dronamk.blogspot.co.uk/2013/03/regex-custom-input-format-for-hadoop.html
我的代码:
// Create some dummy data
val s = """2016-12-23 07:00:09,693 [jetty-51 - /app/service] INFO org.apache.cxf.interceptor.LoggingOutInterceptor S:METHOD_NAME=METHNAME : WebAppSessionId= : ChannelSessionId=web-xxx-xxx-xxx : ClientIp=xxxxxxx : - Outbound Message
|---------------------------
| ID: 1978
| Address: https://sample.domain.com/SampleService.xxx/basic
| Encoding: UTF-8
| Content-Type: text/xml
| Headers: {Accept=[*/*], SOAPAction=["WebDomain.Service/app"]}
| Payload: <soap:Envelope>
| </soap:Envelope>
|2016-12-26 08:00:01,514 [jetty-1195 - /app/service/serviceName] ERROR com.testservices.cache.impl.ActiveSpaceCacheHandler S:METHOD_NAME=ServiceInquiryWithBands : WebAppSessionId= : ChannelSessionId=SERVICE : ClientIp=client-ip : - ActiveSpaceCacheHandler:getServiceResponseFromCache(); exception: java.lang.Exception: getServiceResponseData: com.tibco.as.space.RuntimeASException: field key is not nullable and is missing in tuple for cachekey:Request.US
|2016-12-26 08:00:01,624 [jetty-979 - /app/service/serviceName] ERROR com.testservices.cache.impl.ActiveSpaceCacheHandler S:METHOD_NAME=ServiceInquiryWithBands : WebAppSessionId= : ChannelSessionId=SERVICE : ClientIp=client-ip : - ActiveSpaceCacheHandler:setServiceResponseInCache(); exception: com.test.as.space.RuntimeASException: field key is not nullable and is missing in tuple for cachekey
""".stripMargin
import java.io._
val pw = new PrintWriter(new File("log.txt"))
pw.write(s)
pw.close
// Now process the data
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat
import org.apache.hadoop.io.Text
import org.apache.hadoop.io.LongWritable
import org.apache.spark.{SparkContext, SparkConf}
val conf = sc.getConf
sc.stop()
conf.registerKryoClasses(Array(classOf[org.apache.hadoop.io.LongWritable]))
val sc = new SparkContext(conf)
val hadoopConf = new Configuration(sc.hadoopConfiguration)
hadoopConf.set("textinputformat.record.delimiter", "\n20")
val accessLogs = sc.newAPIHadoopFile("log.txt", classOf[TextInputFormat], classOf[LongWritable], classOf[Text], hadoopConf)
accessLogs.map(x => x._2.toString).zipWithIndex().collect().foreach(println)
请注意,我使用 zipWithIndex 只是为了调试目的。输出为:
(2016-12-23 07:00:09,693 [jetty-51 - /app/service] INFO org.apache.cxf.interceptor.LoggingOutInterceptor S:METHOD_NAME=METHNAME : WebAppSessionId= : ChannelSessionId=web-xxx-xxx-xxx : ClientIp=xxxxxxx : - Outbound Message --------------------------- ID: 1978 Address: https://sample.domain.com/SampleService.xxx/basic Encoding: UTF-8 Content-Type: text/xml Headers: {Accept=[*/*], SOAPAction=["WebDomain.Service/app"]} Payload: ,0) (16-12-26 08:00:01,514 [jetty-1195 - /app/service/serviceName] ERROR com.testservices.cache.impl.ActiveSpaceCacheHandler S:METHOD_NAME=ServiceInquiryWithBands : WebAppSessionId= : ChannelSessionId=SERVICE : ClientIp=client-ip : - ActiveSpaceCacheHandler:getServiceResponseFromCache(); exception: java.lang.Exception: getServiceResponseData: com.tibco.as.space.RuntimeASException: field key is not nullable and is missing in tuple for cachekey:Request.US,1) (16-12-26 08:00:01,624 [jetty-979 - /app/service/serviceName] ERROR com.testservices.cache.impl.ActiveSpaceCacheHandler S:METHOD_NAME=ServiceInquiryWithBands : WebAppSessionId= : ChannelSessionId=SERVICE : ClientIp=client-ip : - ActiveSpaceCacheHandler:setServiceResponseInCache(); exception: com.test.as.space.RuntimeASException: field key is not nullable and is missing in tuple for cachekey ,2)
注意索引是输出中的第二个字段。
我 运行 此代码在 IBM Datascience Exerience 笔记本上 运行 Scala 2.10 和 Spark 1.6