从文本文件中提取特定列以在 Scala 中制作数据框
Extract specific columns form a text file to make a dataframe in scala
我需要清理 scala 中的一些数据。我有以下原始数据,它们存在于文本文件中:
06:36:15.718068 IP 10.0.0.1.5001 > 10.0.0.2.41516: Flags [.], ack 346, win 163, options [nop,nop,TS val 1654418 ecr 1654418], length 0
06:36:15.718078 IP 10.0.0.2.41516 > 10.0.0.1.5001: Flags [.], seq 1:65161, ack 0, win 58, options [nop,nop,TS val 1654418 ecr 1654418], length 65160
我需要按以下方式将所有这些都放在数据框中:
+----------------+-----------+----------+-------+--------+--------+--------+-----+
|time_stamp_0 |sender_ip_1|reciver_2 |s_por_3|r_por_4 |acknu_5 |winnum_6|len_7|
+----------------+-----------+----------+-------+--------+--------+--------+-----+
|06:36:15.718068 |10.0.0.1 |10.0.0.2 |5001 |41516 |346 |163 | 0 |
|06:36:15.718078 |10.0.0.2 |10.0.0.1 |41516 |5001 | 0 | 58 |65160|
+----------------+-----------+----------+-------+--------+--------+--------+-----+
我使用以下代码来获取上述数据帧。
val customSchema = StructType(Array(
StructField("time_stamp_0", StringType, true),
StructField("sender_ip_1", StringType, true),
StructField("receiver_ip_2", StringType, true),
StructField("s_port_3", StringType, true),
StructField("r_port_4", StringType, true),
StructField("acknum_5", StringType, true),
StructField("winnum_6", StringType, true),
StructField("len_7", IntegerType, true)))
///////////////////////////////////////////////////make train dataframe
val Dstream_Train = sc.textFile("/Users/xxxxxx/Desktop/xxxxx/Test/trace8.txt")
val Row_Dstream_Train = Dstream_Train.map(line => line.split(">")).map(array => {
val first = Try(array(0).trim.split(" ")(0)) getOrElse ""
val second = Try(array(1).trim.split(" ")(0)) getOrElse ""
val third = Try(array(2).trim.split(" ")(0).replace(":", "")) getOrElse ""
val fourth = Try(array(3).trim.split(" ")(0)) getOrElse ""
val fifth = Try(array(4).trim.split(" ") (0)) getOrElse ""
val sixth = Try(array(5).trim.split(" ") (0)) getOrElse ""
val seventh = Try(array(6).trim.split(" ")(0)) getOrElse ""
val eighth = Try(array(7).trim.split(" ")(0)) getOrElse ""
val firstFixed = first.take(first.lastIndexOf("."))
val secondfix = second.take(second.lastIndexOf("."))
val thirdFixed = third.take(third.lastIndexOf("."))
Row.fromSeq(Seq(firstFixed, secondfix, thirdFixed, fourth,fifth,sixth,seventh,eighth))
})
val Frist_Dataframe = session.createDataFrame(Row_Dstream_Train, customSchema)
但是问题是第三列没有提取任何内容!你能指导我为什么第三列被提取为空吗?谢谢
在你第一次 map
之后
file.map(line => line.split(">") ).collect
您的输出是:
Array[Array[String]] = Array(
Array("06:36:15.718068 IP 10.0.0.1.5001 ", " 10.0.0.2.41516: Flags [.], ack 346, win 163, options [nop,nop,TS val 1654418 ecr 1654418], length 0"),
Array("06:36:15.718078 IP 10.0.0.2.41516 ", " 10.0.0.1.5001: Flags [.], seq 1:65161, ack 0, win 58, options [nop,nop,TS val 1654418 ecr 1654418], length 65160"))
如您所见,您得到了两个数组,因此在您的下一个映射步骤中,您在 array(1)
之后引用的任何内容都是 ArrayIndexOutOfBoundsException
您需要深入挖掘并检查可以拆分的字符。这应该可以提取第三列。
val Dstream_Train = sc.textFile("/Users/xxxxxx/Desktop/xxxxx/Test/trace8.txt")
val third = Dstream_Train.map(line => line.split(">") ).map( x => x(1).split(":")(0).splitAt(x(1).split(":")(0).lastIndexOf("."))._1 ).collect
third: Array[String] = Array(" 10.0.0.2", " 10.0.0.1")
同样,您可以使用它来获取其他列,但正如建议的那样,RegEx
应该更干净、更容易。
您的输入数据不是固定长度的,因此获得您需要的解决方案有点棘手。考虑到您提供的以下输入数据可能是解决方案。您可以根据需要进行更改
val Row_Dstream_Train = Dstream_Train.map(line => line.split(",")).map(array => {
val array1 = array(0).trim.split("IP")
val array2 = array1(1).split(">")
val array3 = array2(1).split(":")
val acknum5 = if(array(1).contains("seq")) array(2) else array(1)
val winnum6 = if(array(1).contains("seq")) array(3) else array(2)
val len7 = if(array(1).contains("seq")) array(1).trim.split(" ")(1) else ""
val first = Try(array1(0).trim) getOrElse ""
val second = Try(array2(0).trim) getOrElse ""
val third = Try(array3(0)) getOrElse ""
val sixth = Try(acknum5.trim.split(" ")(1)) getOrElse ""
val seventh = Try(winnum6.trim.split(" ")(1)) getOrElse ""
val eighth = Try(len7.substring(len7.lastIndexOf(":")+1, len7.length).toInt) getOrElse 0
val secondfix = second.take(second.lastIndexOf("."))
val sport3 = second.substring(second.lastIndexOf(".")+1, second.length)
val thirdFixed = third.take(third.lastIndexOf("."))
val rport4 = third.substring(third.lastIndexOf(".")+1, third.length)
Row.fromSeq(Seq(first, secondfix, thirdFixed, sport3,rport4,sixth,seventh,eighth))
})
val Frist_Dataframe = sqlContext.createDataFrame(Row_Dstream_Train, customSchema)
您将得到输出
+---------------+-----------+-------------+--------+--------+--------+--------+-----+
|time_stamp_0 |sender_ip_1|receiver_ip_2|s_port_3|r_port_4|acknum_5|winnum_6|len_7|
+---------------+-----------+-------------+--------+--------+--------+--------+-----+
|06:36:15.718068|10.0.0.1 | 10.0.0.2 |5001 |41516 |346 |163 |0 |
|06:36:15.718078|10.0.0.2 | 10.0.0.1 |41516 |5001 |0 |58 |65161|
+---------------+-----------+-------------+--------+--------+--------+--------+-----+
希望解决方案对您有所帮助
此代码存在许多问题。第一个需要在 Schema 中提及。您在架构中声明了一个 IntegerType,但如果它不存在,您将使用 StringType 作为空值。所以这需要改变。
此外,上面指出的数组是一个问题,因为您会遇到索引错误。
我刚刚看到 Ramesh 在我面前发布了一个答案,但这是使用 RegEx 的另一种方式。
RegEx 是解决此问题的另一种方法。如果您查看您的示例,您应该注意到实际上这两行在结构上是不同的。
这就是我为获得结果所做的工作(尽管可能需要对 Regex 进行更多测试以防万一)。所以从正则表达式案例开始:
object RegexPatterns{ // this needs to be done this way to avoid serialisation errors
val patternTS: Regex = "([0-9]+:[0-9]+:[0-9]+.[0-9]+)".r
val patternSIP1: Regex = "(?<=\b IP \b)([0-9]+.[0-9].[0-9].[0-9])(?=.[0-9]+)".r
val patternRIP2: Regex = "(?<=\b > \b)([0-9]..[0-9].[0-9].[0-9])(?=.[0-9]+)".r
val patternSP3: Regex = "(?<=\b IP \b[0-9]..[0-9].[0-9].[0-9].)([0-9]+)".r
val patternRP4: Regex = "(?<=\b > \b[0-9]..[0-9].[0-9].[0-9].)([0-9]+)".r
val patternAN5: Regex = "(?<=\back \b)([0-9]+)".r
val patternWN6: Regex = "(?<=\bwin \b)([0-9]+)".r
val patternL7: Regex = "(?<=\blength \b)([0-9]+)".r
}
你已经实现的代码:
import RegexPatterns._
val Dstream_Train = sc.textFile("/Users/xxxxxx/Desktop/xxxxx/Test/trace8.txt")
val customSchema = StructType(Array(
StructField("time_stamp_0", StringType, nullable = true),
StructField("sender_ip_1", StringType, nullable = true),
StructField("receiver_ip_2", StringType, nullable = true),
StructField("s_port_3", StringType, nullable = true),
StructField("r_port_4", StringType, nullable = true),
StructField("acknum_5", StringType, nullable = true),
StructField("winnum_6", StringType, nullable = true),
StructField("len_7", StringType, nullable = true)))
///////////////////////////////////////////////////make train dataframe
val Dstream_Train: RDD[String] = sparkContext.parallelize(input)
val Row_Dstream_Train: RDD[Row] = Dstream_Train.map { line: String =>
val first = Try((patternTS findAllIn line).mkString(",")) getOrElse ""
val second = Try((patternSIP1 findAllIn line).mkString(",")) getOrElse ""
val third = Try((patternRIP2 findAllIn line).mkString(",")) getOrElse ""
val fourth = Try((patternSP3 findAllIn line).mkString(",")) getOrElse ""
val fifth = Try((patternRP4 findAllIn line).mkString(",")) getOrElse ""
val sixth = Try((patternAN5 findAllIn line).mkString(",")) getOrElse ""
val seventh = Try((patternWN6 findAllIn line).mkString(",")) getOrElse ""
val eighth = Try((patternL7 findAllIn line).mkString(",")) getOrElse ""
Row.fromSeq(Seq(first, second, third, fourth, fifth, sixth, seventh, eighth))
}
val Frist_Dataframe = session.createDataFrame(Row_Dstream_Train, customSchema)
Frist_Dataframe.show(false)
这产生:
+---------------+-----------+-------------+--------+--------+--------+--------+-----+
|time_stamp_0 |sender_ip_1|receiver_ip_2|s_port_3|r_port_4|acknum_5|winnum_6|len_7|
+---------------+-----------+-------------+--------+--------+--------+--------+-----+
|06:36:15.718068|10.0.0.1 |10.0.0.2 |5001 |41516 |346 |163 |0 |
|06:36:15.718078|10.0.0.2 |10.0.0.1 |41516 |5001 |0 |58 |65160|
+---------------+-----------+-------------+--------+--------+--------+--------+-----+
我需要清理 scala 中的一些数据。我有以下原始数据,它们存在于文本文件中:
06:36:15.718068 IP 10.0.0.1.5001 > 10.0.0.2.41516: Flags [.], ack 346, win 163, options [nop,nop,TS val 1654418 ecr 1654418], length 0
06:36:15.718078 IP 10.0.0.2.41516 > 10.0.0.1.5001: Flags [.], seq 1:65161, ack 0, win 58, options [nop,nop,TS val 1654418 ecr 1654418], length 65160
我需要按以下方式将所有这些都放在数据框中:
+----------------+-----------+----------+-------+--------+--------+--------+-----+
|time_stamp_0 |sender_ip_1|reciver_2 |s_por_3|r_por_4 |acknu_5 |winnum_6|len_7|
+----------------+-----------+----------+-------+--------+--------+--------+-----+
|06:36:15.718068 |10.0.0.1 |10.0.0.2 |5001 |41516 |346 |163 | 0 |
|06:36:15.718078 |10.0.0.2 |10.0.0.1 |41516 |5001 | 0 | 58 |65160|
+----------------+-----------+----------+-------+--------+--------+--------+-----+
我使用以下代码来获取上述数据帧。
val customSchema = StructType(Array(
StructField("time_stamp_0", StringType, true),
StructField("sender_ip_1", StringType, true),
StructField("receiver_ip_2", StringType, true),
StructField("s_port_3", StringType, true),
StructField("r_port_4", StringType, true),
StructField("acknum_5", StringType, true),
StructField("winnum_6", StringType, true),
StructField("len_7", IntegerType, true)))
///////////////////////////////////////////////////make train dataframe
val Dstream_Train = sc.textFile("/Users/xxxxxx/Desktop/xxxxx/Test/trace8.txt")
val Row_Dstream_Train = Dstream_Train.map(line => line.split(">")).map(array => {
val first = Try(array(0).trim.split(" ")(0)) getOrElse ""
val second = Try(array(1).trim.split(" ")(0)) getOrElse ""
val third = Try(array(2).trim.split(" ")(0).replace(":", "")) getOrElse ""
val fourth = Try(array(3).trim.split(" ")(0)) getOrElse ""
val fifth = Try(array(4).trim.split(" ") (0)) getOrElse ""
val sixth = Try(array(5).trim.split(" ") (0)) getOrElse ""
val seventh = Try(array(6).trim.split(" ")(0)) getOrElse ""
val eighth = Try(array(7).trim.split(" ")(0)) getOrElse ""
val firstFixed = first.take(first.lastIndexOf("."))
val secondfix = second.take(second.lastIndexOf("."))
val thirdFixed = third.take(third.lastIndexOf("."))
Row.fromSeq(Seq(firstFixed, secondfix, thirdFixed, fourth,fifth,sixth,seventh,eighth))
})
val Frist_Dataframe = session.createDataFrame(Row_Dstream_Train, customSchema)
但是问题是第三列没有提取任何内容!你能指导我为什么第三列被提取为空吗?谢谢
在你第一次 map
file.map(line => line.split(">") ).collect
您的输出是:
Array[Array[String]] = Array(
Array("06:36:15.718068 IP 10.0.0.1.5001 ", " 10.0.0.2.41516: Flags [.], ack 346, win 163, options [nop,nop,TS val 1654418 ecr 1654418], length 0"),
Array("06:36:15.718078 IP 10.0.0.2.41516 ", " 10.0.0.1.5001: Flags [.], seq 1:65161, ack 0, win 58, options [nop,nop,TS val 1654418 ecr 1654418], length 65160"))
如您所见,您得到了两个数组,因此在您的下一个映射步骤中,您在 array(1)
之后引用的任何内容都是 ArrayIndexOutOfBoundsException
您需要深入挖掘并检查可以拆分的字符。这应该可以提取第三列。
val Dstream_Train = sc.textFile("/Users/xxxxxx/Desktop/xxxxx/Test/trace8.txt")
val third = Dstream_Train.map(line => line.split(">") ).map( x => x(1).split(":")(0).splitAt(x(1).split(":")(0).lastIndexOf("."))._1 ).collect
third: Array[String] = Array(" 10.0.0.2", " 10.0.0.1")
同样,您可以使用它来获取其他列,但正如建议的那样,RegEx
应该更干净、更容易。
您的输入数据不是固定长度的,因此获得您需要的解决方案有点棘手。考虑到您提供的以下输入数据可能是解决方案。您可以根据需要进行更改
val Row_Dstream_Train = Dstream_Train.map(line => line.split(",")).map(array => {
val array1 = array(0).trim.split("IP")
val array2 = array1(1).split(">")
val array3 = array2(1).split(":")
val acknum5 = if(array(1).contains("seq")) array(2) else array(1)
val winnum6 = if(array(1).contains("seq")) array(3) else array(2)
val len7 = if(array(1).contains("seq")) array(1).trim.split(" ")(1) else ""
val first = Try(array1(0).trim) getOrElse ""
val second = Try(array2(0).trim) getOrElse ""
val third = Try(array3(0)) getOrElse ""
val sixth = Try(acknum5.trim.split(" ")(1)) getOrElse ""
val seventh = Try(winnum6.trim.split(" ")(1)) getOrElse ""
val eighth = Try(len7.substring(len7.lastIndexOf(":")+1, len7.length).toInt) getOrElse 0
val secondfix = second.take(second.lastIndexOf("."))
val sport3 = second.substring(second.lastIndexOf(".")+1, second.length)
val thirdFixed = third.take(third.lastIndexOf("."))
val rport4 = third.substring(third.lastIndexOf(".")+1, third.length)
Row.fromSeq(Seq(first, secondfix, thirdFixed, sport3,rport4,sixth,seventh,eighth))
})
val Frist_Dataframe = sqlContext.createDataFrame(Row_Dstream_Train, customSchema)
您将得到输出
+---------------+-----------+-------------+--------+--------+--------+--------+-----+
|time_stamp_0 |sender_ip_1|receiver_ip_2|s_port_3|r_port_4|acknum_5|winnum_6|len_7|
+---------------+-----------+-------------+--------+--------+--------+--------+-----+
|06:36:15.718068|10.0.0.1 | 10.0.0.2 |5001 |41516 |346 |163 |0 |
|06:36:15.718078|10.0.0.2 | 10.0.0.1 |41516 |5001 |0 |58 |65161|
+---------------+-----------+-------------+--------+--------+--------+--------+-----+
希望解决方案对您有所帮助
此代码存在许多问题。第一个需要在 Schema 中提及。您在架构中声明了一个 IntegerType,但如果它不存在,您将使用 StringType 作为空值。所以这需要改变。
此外,上面指出的数组是一个问题,因为您会遇到索引错误。
我刚刚看到 Ramesh 在我面前发布了一个答案,但这是使用 RegEx 的另一种方式。
RegEx 是解决此问题的另一种方法。如果您查看您的示例,您应该注意到实际上这两行在结构上是不同的。
这就是我为获得结果所做的工作(尽管可能需要对 Regex 进行更多测试以防万一)。所以从正则表达式案例开始:
object RegexPatterns{ // this needs to be done this way to avoid serialisation errors
val patternTS: Regex = "([0-9]+:[0-9]+:[0-9]+.[0-9]+)".r
val patternSIP1: Regex = "(?<=\b IP \b)([0-9]+.[0-9].[0-9].[0-9])(?=.[0-9]+)".r
val patternRIP2: Regex = "(?<=\b > \b)([0-9]..[0-9].[0-9].[0-9])(?=.[0-9]+)".r
val patternSP3: Regex = "(?<=\b IP \b[0-9]..[0-9].[0-9].[0-9].)([0-9]+)".r
val patternRP4: Regex = "(?<=\b > \b[0-9]..[0-9].[0-9].[0-9].)([0-9]+)".r
val patternAN5: Regex = "(?<=\back \b)([0-9]+)".r
val patternWN6: Regex = "(?<=\bwin \b)([0-9]+)".r
val patternL7: Regex = "(?<=\blength \b)([0-9]+)".r
}
你已经实现的代码:
import RegexPatterns._
val Dstream_Train = sc.textFile("/Users/xxxxxx/Desktop/xxxxx/Test/trace8.txt")
val customSchema = StructType(Array(
StructField("time_stamp_0", StringType, nullable = true),
StructField("sender_ip_1", StringType, nullable = true),
StructField("receiver_ip_2", StringType, nullable = true),
StructField("s_port_3", StringType, nullable = true),
StructField("r_port_4", StringType, nullable = true),
StructField("acknum_5", StringType, nullable = true),
StructField("winnum_6", StringType, nullable = true),
StructField("len_7", StringType, nullable = true)))
///////////////////////////////////////////////////make train dataframe
val Dstream_Train: RDD[String] = sparkContext.parallelize(input)
val Row_Dstream_Train: RDD[Row] = Dstream_Train.map { line: String =>
val first = Try((patternTS findAllIn line).mkString(",")) getOrElse ""
val second = Try((patternSIP1 findAllIn line).mkString(",")) getOrElse ""
val third = Try((patternRIP2 findAllIn line).mkString(",")) getOrElse ""
val fourth = Try((patternSP3 findAllIn line).mkString(",")) getOrElse ""
val fifth = Try((patternRP4 findAllIn line).mkString(",")) getOrElse ""
val sixth = Try((patternAN5 findAllIn line).mkString(",")) getOrElse ""
val seventh = Try((patternWN6 findAllIn line).mkString(",")) getOrElse ""
val eighth = Try((patternL7 findAllIn line).mkString(",")) getOrElse ""
Row.fromSeq(Seq(first, second, third, fourth, fifth, sixth, seventh, eighth))
}
val Frist_Dataframe = session.createDataFrame(Row_Dstream_Train, customSchema)
Frist_Dataframe.show(false)
这产生:
+---------------+-----------+-------------+--------+--------+--------+--------+-----+
|time_stamp_0 |sender_ip_1|receiver_ip_2|s_port_3|r_port_4|acknum_5|winnum_6|len_7|
+---------------+-----------+-------------+--------+--------+--------+--------+-----+
|06:36:15.718068|10.0.0.1 |10.0.0.2 |5001 |41516 |346 |163 |0 |
|06:36:15.718078|10.0.0.2 |10.0.0.1 |41516 |5001 |0 |58 |65160|
+---------------+-----------+-------------+--------+--------+--------+--------+-----+