从文本文件中提取特定列以在 Scala 中制作数据框

Extract specific columns form a text file to make a dataframe in scala

我需要清理 scala 中的一些数据。我有以下原始数据,它们存在于文本文件中:

06:36:15.718068 IP 10.0.0.1.5001 > 10.0.0.2.41516: Flags [.], ack 346, win 163, options [nop,nop,TS val 1654418 ecr 1654418], length 0
06:36:15.718078 IP 10.0.0.2.41516 > 10.0.0.1.5001: Flags [.], seq 1:65161, ack 0, win 58, options [nop,nop,TS val 1654418 ecr 1654418], length 65160

我需要按以下方式将所有这些都放在数据框中:

+----------------+-----------+----------+-------+--------+--------+--------+-----+
|time_stamp_0    |sender_ip_1|reciver_2 |s_por_3|r_por_4 |acknu_5 |winnum_6|len_7|
+----------------+-----------+----------+-------+--------+--------+--------+-----+
|06:36:15.718068 |10.0.0.1   |10.0.0.2  |5001   |41516   |346     |163     |  0  |
|06:36:15.718078 |10.0.0.2   |10.0.0.1  |41516  |5001    |  0     | 58     |65160|
+----------------+-----------+----------+-------+--------+--------+--------+-----+

我使用以下代码来获取上述数据帧。

  val customSchema = StructType(Array(
      StructField("time_stamp_0", StringType, true),
      StructField("sender_ip_1", StringType, true),
      StructField("receiver_ip_2", StringType, true),
      StructField("s_port_3", StringType, true),
      StructField("r_port_4", StringType, true),
      StructField("acknum_5", StringType, true),
      StructField("winnum_6", StringType, true),
      StructField("len_7", IntegerType, true)))

    ///////////////////////////////////////////////////make train dataframe
    val Dstream_Train = sc.textFile("/Users/xxxxxx/Desktop/xxxxx/Test/trace8.txt")
    val Row_Dstream_Train = Dstream_Train.map(line => line.split(">")).map(array => {
      val first = Try(array(0).trim.split(" ")(0)) getOrElse ""
      val second = Try(array(1).trim.split(" ")(0)) getOrElse ""
      val third = Try(array(2).trim.split(" ")(0).replace(":", "")) getOrElse ""
      val fourth = Try(array(3).trim.split(" ")(0)) getOrElse ""
      val fifth = Try(array(4).trim.split(" ") (0)) getOrElse ""
      val sixth = Try(array(5).trim.split(" ") (0)) getOrElse ""
      val seventh = Try(array(6).trim.split(" ")(0)) getOrElse ""
      val eighth = Try(array(7).trim.split(" ")(0)) getOrElse ""

      val firstFixed = first.take(first.lastIndexOf("."))
      val secondfix = second.take(second.lastIndexOf("."))
      val thirdFixed = third.take(third.lastIndexOf("."))
      Row.fromSeq(Seq(firstFixed, secondfix, thirdFixed, fourth,fifth,sixth,seventh,eighth))
    })
    val Frist_Dataframe = session.createDataFrame(Row_Dstream_Train, customSchema)

但是问题是第三列没有提取任何内容!你能指导我为什么第三列被提取为空吗?谢谢

在你第一次 map

之后
file.map(line => line.split(">") ).collect

您的输出是:

Array[Array[String]] = Array(
Array("06:36:15.718068 IP 10.0.0.1.5001 ", " 10.0.0.2.41516: Flags [.], ack 346, win 163, options [nop,nop,TS val 1654418 ecr 1654418], length 0"), 
Array("06:36:15.718078 IP 10.0.0.2.41516 ", " 10.0.0.1.5001: Flags [.], seq 1:65161, ack 0, win 58, options [nop,nop,TS val 1654418 ecr 1654418], length 65160"))

如您所见,您得到了两个数组,因此在您的下一个映射步骤中,您在 array(1) 之后引用的任何内容都是 ArrayIndexOutOfBoundsException

您需要深入挖掘并检查可以拆分的字符。这应该可以提取第三列。

val Dstream_Train = sc.textFile("/Users/xxxxxx/Desktop/xxxxx/Test/trace8.txt")
val third = Dstream_Train.map(line => line.split(">") ).map( x => x(1).split(":")(0).splitAt(x(1).split(":")(0).lastIndexOf("."))._1 ).collect
third: Array[String] = Array(" 10.0.0.2", " 10.0.0.1")

同样,您可以使用它来获取其他列,但正如建议的那样,RegEx 应该更干净、更容易。

您的输入数据不是固定长度的,因此获得您需要的解决方案有点棘手。考虑到您提供的以下输入数据可能是解决方案。您可以根据需要进行更改

val Row_Dstream_Train = Dstream_Train.map(line => line.split(",")).map(array => {

  val array1 = array(0).trim.split("IP")
  val array2 = array1(1).split(">")
  val array3 = array2(1).split(":")

  val acknum5 = if(array(1).contains("seq")) array(2) else array(1)
  val winnum6 = if(array(1).contains("seq")) array(3) else array(2)
  val len7 = if(array(1).contains("seq")) array(1).trim.split(" ")(1) else ""

  val first = Try(array1(0).trim) getOrElse ""
  val second = Try(array2(0).trim) getOrElse ""
  val third = Try(array3(0)) getOrElse ""
  val sixth = Try(acknum5.trim.split(" ")(1)) getOrElse ""
  val seventh = Try(winnum6.trim.split(" ")(1)) getOrElse ""
  val eighth = Try(len7.substring(len7.lastIndexOf(":")+1, len7.length).toInt) getOrElse 0

  val secondfix = second.take(second.lastIndexOf("."))
  val sport3 = second.substring(second.lastIndexOf(".")+1, second.length)
  val thirdFixed = third.take(third.lastIndexOf("."))
  val rport4 = third.substring(third.lastIndexOf(".")+1, third.length)

  Row.fromSeq(Seq(first, secondfix, thirdFixed, sport3,rport4,sixth,seventh,eighth))
})
val Frist_Dataframe = sqlContext.createDataFrame(Row_Dstream_Train, customSchema)

您将得到输出

+---------------+-----------+-------------+--------+--------+--------+--------+-----+
|time_stamp_0   |sender_ip_1|receiver_ip_2|s_port_3|r_port_4|acknum_5|winnum_6|len_7|
+---------------+-----------+-------------+--------+--------+--------+--------+-----+
|06:36:15.718068|10.0.0.1   | 10.0.0.2    |5001    |41516   |346     |163     |0    |
|06:36:15.718078|10.0.0.2   | 10.0.0.1    |41516   |5001    |0       |58      |65161|
+---------------+-----------+-------------+--------+--------+--------+--------+-----+

希望解决方案对您有所帮助

此代码存在许多问题。第一个需要在 Schema 中提及。您在架构中声明了一个 IntegerType,但如果它不存在,您将使用 StringType 作为空值。所以这需要改变。

此外,上面指出的数组是一个问题,因为您会遇到索引错误。

我刚刚看到 Ramesh 在我面前发布了一个答案,但这是使用 RegEx 的另一种方式。

RegEx 是解决此问题的另一种方法。如果您查看您的示例,您应该注意到实际上这两行在结构上是不同的。

这就是我为获得结果所做的工作(尽管可能需要对 Regex 进行更多测试以防万一)。所以从正则表达式案例开始:

object RegexPatterns{ // this needs to be done this way to avoid serialisation errors
  val patternTS: Regex = "([0-9]+:[0-9]+:[0-9]+.[0-9]+)".r
  val patternSIP1: Regex = "(?<=\b IP \b)([0-9]+.[0-9].[0-9].[0-9])(?=.[0-9]+)".r
  val patternRIP2: Regex = "(?<=\b > \b)([0-9]..[0-9].[0-9].[0-9])(?=.[0-9]+)".r
  val patternSP3: Regex = "(?<=\b IP \b[0-9]..[0-9].[0-9].[0-9].)([0-9]+)".r
  val patternRP4: Regex = "(?<=\b > \b[0-9]..[0-9].[0-9].[0-9].)([0-9]+)".r
  val patternAN5: Regex = "(?<=\back \b)([0-9]+)".r
  val patternWN6: Regex = "(?<=\bwin \b)([0-9]+)".r
  val patternL7: Regex = "(?<=\blength \b)([0-9]+)".r
}

你已经实现的代码:

import RegexPatterns._

val Dstream_Train = sc.textFile("/Users/xxxxxx/Desktop/xxxxx/Test/trace8.txt")

  val customSchema = StructType(Array(
    StructField("time_stamp_0", StringType, nullable = true),
    StructField("sender_ip_1", StringType, nullable = true),
    StructField("receiver_ip_2", StringType, nullable = true),
    StructField("s_port_3", StringType, nullable = true),
    StructField("r_port_4", StringType, nullable = true),
    StructField("acknum_5", StringType, nullable = true),
    StructField("winnum_6", StringType, nullable = true),
    StructField("len_7", StringType, nullable = true)))

  ///////////////////////////////////////////////////make train dataframe
  val Dstream_Train: RDD[String] = sparkContext.parallelize(input)
  val Row_Dstream_Train: RDD[Row] = Dstream_Train.map { line: String =>
    val first = Try((patternTS findAllIn line).mkString(",")) getOrElse ""
    val second = Try((patternSIP1 findAllIn line).mkString(",")) getOrElse ""
    val third = Try((patternRIP2 findAllIn line).mkString(",")) getOrElse ""
    val fourth = Try((patternSP3 findAllIn line).mkString(",")) getOrElse ""
    val fifth = Try((patternRP4 findAllIn line).mkString(",")) getOrElse ""
    val sixth = Try((patternAN5 findAllIn line).mkString(",")) getOrElse ""
    val seventh = Try((patternWN6 findAllIn line).mkString(",")) getOrElse ""
    val eighth = Try((patternL7 findAllIn line).mkString(",")) getOrElse ""

    Row.fromSeq(Seq(first, second, third, fourth, fifth, sixth, seventh, eighth))
  }

  val Frist_Dataframe = session.createDataFrame(Row_Dstream_Train, customSchema)
  Frist_Dataframe.show(false)

这产生:

+---------------+-----------+-------------+--------+--------+--------+--------+-----+
|time_stamp_0   |sender_ip_1|receiver_ip_2|s_port_3|r_port_4|acknum_5|winnum_6|len_7|
+---------------+-----------+-------------+--------+--------+--------+--------+-----+
|06:36:15.718068|10.0.0.1   |10.0.0.2     |5001    |41516   |346     |163     |0    |
|06:36:15.718078|10.0.0.2   |10.0.0.1     |41516   |5001    |0       |58      |65160|
+---------------+-----------+-------------+--------+--------+--------+--------+-----+