如何在从套接字源数据创建数据帧时指定 shema?
How to specify shema while creating dataframe from socket sourced data?
我对 spark 还很陌生。我在端口 9999
-
上以以下格式流式传输日志文件
2019-09-15 23:45:37,370|10.1.198.43|splunk|headObject|splunk|splunk|160|0|55|246|461|1322|_introspection%2Fdma%2F27%2F99%2F103%7E955F0453-052A-4B49-BC09-8259A2B608E4%2F5C218CC9-77D3-4BBE-AD24-A5FB31CC56F2_DM_SplunkforPaloAltoNetworks_pan_wildfire_report%2Freceipt.json|404|2d86e6fa-c7e4-1c5b-8d12-54ab3a911327|0|NoSuchKey|
2019-09-15 23:45:37,379|10.1.198.53|splunk|getObject|splunk|splunk|160|0|55|246|461|1567|_introspection%2Fdma%2F27%2F99%2F103%7E955F0453-052A-4B49-BC09-8259A2B608E4%2F5C218CC9-77D3-4BBE-AD24-A5FB31CC56F2_DM_SplunkforPaloAltoNetworks_pan_wildfire_report%2Freceipt.json|404|2d86e6fc-c7e4-1c5b-8d12-54ab3a911327|0|NoSuchKey|
2019-09-15 23:45:37,430|10.1.198.53|splunk|headObject|splunk|splunk|160|0|55|246|461|1329|_introspection%2Fdma%2F27%2F99%2F103%7E955F0453-052A-4B49-BC09-8259A2B608E4%2F5C218CC9-77D3-4BBE-AD24-A5FB31CC56F2_DM_SplunkforPaloAltoNetworks_pan_wildfire_report%2Freceipt.json|404|2d86e6fe-c7e4-1c5b-8d12-54ab3a911327|0|NoSuchKey|
2019-09-15 23:45:38,545|10.29.2.5||unknown|||0|0|0|250|250|223||400|2d86e700-c7e4-1c5b-8d12-54ab3a911327|0|InvalidBucketName|
2019-09-15 23:45:38,614|10.29.2.6||unknown|||0|0|0|250|250|187||400|2d86e702-c7e4-1c5b-8d12-54ab3a911327|0|InvalidBucketName|
我的数据帧流编码是 -
val linesDF: DataFrame = spark
.readStream
.format("socket")
.option("host", "127.0.0.1")
.option("port", 9999)
.option("delimiter", "|")
.schema(schema1)
.load()
但是我收到一个错误,提示我无法使用 socket
源指定架构。如何读取这些数据?
我的架构如下所示 -
val schemaString = "dttm|ip|bktownr|oper|bktnm|usr|" +
"reqhdr|reqbd|reshdr|resbd|totsize|" +
"duration|objnm|httpstts|s3reqid|etag|errcd|srcbkt"
val schema1 = StructType(
schemaString
.split('|')
.map(fieldName => StructField(fieldName, StringType, true))
)
删除该选项后,我知道我可以读取数据,但它是在名为 value
的单列下读取的。我进行了将数据加载到静态数据框中并在那里提供模式的实验。但在这种情况下,这是行不通的,我实际上想将流程转换为流式传输,数据将通过套接字 9999
流式传输。
我还有其他遗漏的地方吗?
如果您尝试以 spark 的结构化流方式读取日志,您可以直接指定日志文件目录,spark 也会在流中读取它以及新文件。这是我解决这个问题的方法 -
val rawData = spark
.readStream
.format("csv")
.option("delimiter", "|")
.option("header", "true")
.schema(customSchema)
.load("/Users/username/logs/cloud-logs")
我对 spark 还很陌生。我在端口 9999
-
2019-09-15 23:45:37,370|10.1.198.43|splunk|headObject|splunk|splunk|160|0|55|246|461|1322|_introspection%2Fdma%2F27%2F99%2F103%7E955F0453-052A-4B49-BC09-8259A2B608E4%2F5C218CC9-77D3-4BBE-AD24-A5FB31CC56F2_DM_SplunkforPaloAltoNetworks_pan_wildfire_report%2Freceipt.json|404|2d86e6fa-c7e4-1c5b-8d12-54ab3a911327|0|NoSuchKey|
2019-09-15 23:45:37,379|10.1.198.53|splunk|getObject|splunk|splunk|160|0|55|246|461|1567|_introspection%2Fdma%2F27%2F99%2F103%7E955F0453-052A-4B49-BC09-8259A2B608E4%2F5C218CC9-77D3-4BBE-AD24-A5FB31CC56F2_DM_SplunkforPaloAltoNetworks_pan_wildfire_report%2Freceipt.json|404|2d86e6fc-c7e4-1c5b-8d12-54ab3a911327|0|NoSuchKey|
2019-09-15 23:45:37,430|10.1.198.53|splunk|headObject|splunk|splunk|160|0|55|246|461|1329|_introspection%2Fdma%2F27%2F99%2F103%7E955F0453-052A-4B49-BC09-8259A2B608E4%2F5C218CC9-77D3-4BBE-AD24-A5FB31CC56F2_DM_SplunkforPaloAltoNetworks_pan_wildfire_report%2Freceipt.json|404|2d86e6fe-c7e4-1c5b-8d12-54ab3a911327|0|NoSuchKey|
2019-09-15 23:45:38,545|10.29.2.5||unknown|||0|0|0|250|250|223||400|2d86e700-c7e4-1c5b-8d12-54ab3a911327|0|InvalidBucketName|
2019-09-15 23:45:38,614|10.29.2.6||unknown|||0|0|0|250|250|187||400|2d86e702-c7e4-1c5b-8d12-54ab3a911327|0|InvalidBucketName|
我的数据帧流编码是 -
val linesDF: DataFrame = spark
.readStream
.format("socket")
.option("host", "127.0.0.1")
.option("port", 9999)
.option("delimiter", "|")
.schema(schema1)
.load()
但是我收到一个错误,提示我无法使用 socket
源指定架构。如何读取这些数据?
我的架构如下所示 -
val schemaString = "dttm|ip|bktownr|oper|bktnm|usr|" +
"reqhdr|reqbd|reshdr|resbd|totsize|" +
"duration|objnm|httpstts|s3reqid|etag|errcd|srcbkt"
val schema1 = StructType(
schemaString
.split('|')
.map(fieldName => StructField(fieldName, StringType, true))
)
删除该选项后,我知道我可以读取数据,但它是在名为 value
的单列下读取的。我进行了将数据加载到静态数据框中并在那里提供模式的实验。但在这种情况下,这是行不通的,我实际上想将流程转换为流式传输,数据将通过套接字 9999
流式传输。
我还有其他遗漏的地方吗?
如果您尝试以 spark 的结构化流方式读取日志,您可以直接指定日志文件目录,spark 也会在流中读取它以及新文件。这是我解决这个问题的方法 -
val rawData = spark
.readStream
.format("csv")
.option("delimiter", "|")
.option("header", "true")
.schema(customSchema)
.load("/Users/username/logs/cloud-logs")