如何在从套接字源数据创建数据帧时指定 shema?

How to specify shema while creating dataframe from socket sourced data?

我对 spark 还很陌生。我在端口 9999 -

上以以下格式流式传输日志文件
2019-09-15 23:45:37,370|10.1.198.43|splunk|headObject|splunk|splunk|160|0|55|246|461|1322|_introspection%2Fdma%2F27%2F99%2F103%7E955F0453-052A-4B49-BC09-8259A2B608E4%2F5C218CC9-77D3-4BBE-AD24-A5FB31CC56F2_DM_SplunkforPaloAltoNetworks_pan_wildfire_report%2Freceipt.json|404|2d86e6fa-c7e4-1c5b-8d12-54ab3a911327|0|NoSuchKey|
2019-09-15 23:45:37,379|10.1.198.53|splunk|getObject|splunk|splunk|160|0|55|246|461|1567|_introspection%2Fdma%2F27%2F99%2F103%7E955F0453-052A-4B49-BC09-8259A2B608E4%2F5C218CC9-77D3-4BBE-AD24-A5FB31CC56F2_DM_SplunkforPaloAltoNetworks_pan_wildfire_report%2Freceipt.json|404|2d86e6fc-c7e4-1c5b-8d12-54ab3a911327|0|NoSuchKey|
2019-09-15 23:45:37,430|10.1.198.53|splunk|headObject|splunk|splunk|160|0|55|246|461|1329|_introspection%2Fdma%2F27%2F99%2F103%7E955F0453-052A-4B49-BC09-8259A2B608E4%2F5C218CC9-77D3-4BBE-AD24-A5FB31CC56F2_DM_SplunkforPaloAltoNetworks_pan_wildfire_report%2Freceipt.json|404|2d86e6fe-c7e4-1c5b-8d12-54ab3a911327|0|NoSuchKey|
2019-09-15 23:45:38,545|10.29.2.5||unknown|||0|0|0|250|250|223||400|2d86e700-c7e4-1c5b-8d12-54ab3a911327|0|InvalidBucketName|
2019-09-15 23:45:38,614|10.29.2.6||unknown|||0|0|0|250|250|187||400|2d86e702-c7e4-1c5b-8d12-54ab3a911327|0|InvalidBucketName|

我的数据帧流编码是 -

val linesDF: DataFrame = spark
      .readStream
      .format("socket")
      .option("host", "127.0.0.1")
      .option("port", 9999)
      .option("delimiter", "|")
      .schema(schema1)
      .load()

但是我收到一个错误,提示我无法使用 socket 源指定架构。如何读取这些数据?

我的架构如下所示 -

val schemaString = "dttm|ip|bktownr|oper|bktnm|usr|" +
      "reqhdr|reqbd|reshdr|resbd|totsize|" +
      "duration|objnm|httpstts|s3reqid|etag|errcd|srcbkt"

    val schema1 = StructType(
      schemaString
        .split('|')
        .map(fieldName => StructField(fieldName, StringType, true))
    )

删除该选项后,我知道我可以读取数据,但它是在名为 value 的单列下读取的。我进行了将数据加载到静态数据框中并在那里提供模式的实验。但在这种情况下,这是行不通的,我实际上想将流程转换为流式传输,数据将通过套接字 9999 流式传输。

我还有其他遗漏的地方吗?

如果您尝试以 spark 的结构化流方式读取日志,您可以直接指定日志文件目录,spark 也会在流中读取它以及新文件。这是我解决这个问题的方法 -

val rawData = spark
      .readStream
      .format("csv")
      .option("delimiter", "|")
      .option("header", "true")
      .schema(customSchema)
      .load("/Users/username/logs/cloud-logs")