我的特征列在数据框中变为空
My feature column becomes null in the dataframe
我是 spark 的新手,我需要对我的数据进行一些机器学习并预测 "count" 值。这是我的原始数据:
05:49:56.604899 00:00:00:00:00:02 > 00:00:00:00:00:03, ethertype IPv4 (0x0800), length 10202: 10.0.0.2.54880 > 10.0.0.3.5001: Flags [.], seq 3641977583:3641987719, ack 129899328, win 58, options [nop,nop,TS val 432623 ecr 432619], length 10136
05:49:56.604908 00:00:00:00:00:03 > 00:00:00:00:00:02, ethertype IPv4 (0x0800), length 66: 10.0.0.3.5001 > 10.0.0.2.54880: Flags [.], ack 10136, win 153, options [nop,nop,TS val 432623 ecr 432623], length 0
我使用以下代码制作了一个包含 time_stamp_0、sender_ip_1 和 receiver_ip_2 列的数据框:
val customSchema = StructType(Array(
StructField("time_stamp_0", StringType, true),
StructField("sender_ip_1", StringType, true),
StructField("receiver_ip_2", StringType, true)))
///////////////////////////////////////////////////make train dataframe
val Dstream_Train = sc.textFile("/Users/saeedtkh/Desktop/sharedsaeed/Test/trace1.txt")
val Row_Dstream_Train = Dstream_Train.map(line => line.split(">")).map(array => {
val first = Try(array(0).trim.split(" ")(0)) getOrElse ""
val second = Try(array(1).trim.split(" ")(6)) getOrElse ""
val third = Try(array(2).trim.split(" ")(0).replace(":", "")) getOrElse ""
val firstFixed = first.take(first.lastIndexOf("."))
val secondfix = second.take(second.lastIndexOf("."))
val thirdFixed = third.take(third.lastIndexOf("."))
Row.fromSeq(Seq(firstFixed, secondfix, thirdFixed))
})
val Frist_Dataframe = session.createDataFrame(Row_Dstream_Train, customSchema).toDF("time_stamp_0", "sender_ip_1", "receiver_ip_2")
val columns1and2 = Window.partitionBy("sender_ip_1", "receiver_ip_2") // <-- matches groupBy
///I add count to the dataframe
val Dataframe = Frist_Dataframe.withColumn("count", count($"receiver_ip_2") over columns1and2)
Dataframe.show()
这是输出:
+------------+-----------+-------------+-----+
|time_stamp_0|sender_ip_1|receiver_ip_2|count|
+------------+-----------+-------------+-----+
| 05:49:56| 10.0.0.2| 10.0.0.3| 19|
| 05:49:56| 10.0.0.2| 10.0.0.3| 19|
| 05:49:56| 10.0.0.2| 10.0.0.3| 19|
| 05:49:56| 10.0.0.2| 10.0.0.3| 19|
| 05:49:56| 10.0.0.2| 10.0.0.3| 19|
| 05:49:56| 10.0.0.2| 10.0.0.3| 19|
| 05:49:56| 10.0.0.2| 10.0.0.3| 19|
| 05:49:56| 10.0.0.2| 10.0.0.3| 19|
| 05:49:56| 10.0.0.2| 10.0.0.3| 19|
| 05:49:56| 10.0.0.2| 10.0.0.3| 19|
| 05:49:56| 10.0.0.2| 10.0.0.3| 19|
| 05:49:56| 10.0.0.2| 10.0.0.3| 19|
| 05:49:56| 10.0.0.2| 10.0.0.3| 19|
| 05:49:56| 10.0.0.2| 10.0.0.3| 19|
| 05:49:56| 10.0.0.2| 10.0.0.3| 19|
| 05:49:56| 10.0.0.2| 10.0.0.3| 19|
| 05:49:56| 10.0.0.2| 10.0.0.3| 19|
| 05:49:56| 10.0.0.2| 10.0.0.3| 19|
| 05:49:56| 10.0.0.2| 10.0.0.3| 19|
| 05:49:56| 10.0.0.3| 10.0.0.2| 10|
+------------+-----------+-------------+-----+
我想预测两个 IP 之间的连接数。我向数据框添加了计数。我也尝试制作标签和特征来开始预测。我还需要为训练和测试部分泄露数据。我使用了以下代码:
val toVec4 = udf[Vector, Int, Int, String, String] { (a,b,c,d) =>
val e3 = c match {
case "10.0.0.1" => 1
case "10.0.0.2" => 2
case "10.0.0.3" => 3
}
val e4 = d match {
case "10.0.0.1" => 1
case "10.0.0.2" => 2
case "10.0.0.3" => 3
}
Vectors.dense(a, b, e3, e4)
}
//val encodeLabel = udf[Double, String]( _ match { case "A" => 0.0 case "B" => 1.0} )
val final_df = Dataframe.withColumn(
"features",
toVec4(
Dataframe("time_stamp_0"),
Dataframe("count"),
Dataframe("sender_ip_1"),
Dataframe("receiver_ip_2")
)
).withColumn("label", (Dataframe("count"))).select("features", "label")
final_df.show()
val trainingTest = final_df.randomSplit(Array(0.3, 0.7))
val TrainingDF = trainingTest(0)
val TestingDF=trainingTest(1)
//TrainingDF.show()
//TestingDF.show()
但是问题是功能变为空!
+--------+-----+
|features|label|
+--------+-----+
| null| 19|
| null| 19|
| null| 19|
| null| 19|
| null| 19|
| null| 19|
| null| 19|
| null| 19|
| null| 19|
| null| 19|
| null| 19|
| null| 19|
| null| 19|
| null| 19|
| null| 19|
| null| 19|
| null| 19|
| null| 19|
| null| 19|
| null| 10|
+--------+-----+
谁能帮我解决这个问题。提前致谢。
这里的问题是您的 UDF 期望四个输入列的类型为 Int, Int, String, String
,而您将 String
作为第一列 (time_stamp_0
) 传递。
您可以通过调整 UDF 或将字段转换为 Int
:
来解决这个问题
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
val final_df = df.withColumn(
"features",
toVec4(
// casting into Timestamp to parse the string, and then into Int
$"time_stamp_0".cast(TimestampType).cast(IntegerType),
$"count",
$"sender_ip_1",
$"receiver_ip_2"
)
)
我必须说我希望得到一个适当的例外而不是 null
结果,但显然这是当前的行为。
我是 spark 的新手,我需要对我的数据进行一些机器学习并预测 "count" 值。这是我的原始数据:
05:49:56.604899 00:00:00:00:00:02 > 00:00:00:00:00:03, ethertype IPv4 (0x0800), length 10202: 10.0.0.2.54880 > 10.0.0.3.5001: Flags [.], seq 3641977583:3641987719, ack 129899328, win 58, options [nop,nop,TS val 432623 ecr 432619], length 10136
05:49:56.604908 00:00:00:00:00:03 > 00:00:00:00:00:02, ethertype IPv4 (0x0800), length 66: 10.0.0.3.5001 > 10.0.0.2.54880: Flags [.], ack 10136, win 153, options [nop,nop,TS val 432623 ecr 432623], length 0
我使用以下代码制作了一个包含 time_stamp_0、sender_ip_1 和 receiver_ip_2 列的数据框:
val customSchema = StructType(Array(
StructField("time_stamp_0", StringType, true),
StructField("sender_ip_1", StringType, true),
StructField("receiver_ip_2", StringType, true)))
///////////////////////////////////////////////////make train dataframe
val Dstream_Train = sc.textFile("/Users/saeedtkh/Desktop/sharedsaeed/Test/trace1.txt")
val Row_Dstream_Train = Dstream_Train.map(line => line.split(">")).map(array => {
val first = Try(array(0).trim.split(" ")(0)) getOrElse ""
val second = Try(array(1).trim.split(" ")(6)) getOrElse ""
val third = Try(array(2).trim.split(" ")(0).replace(":", "")) getOrElse ""
val firstFixed = first.take(first.lastIndexOf("."))
val secondfix = second.take(second.lastIndexOf("."))
val thirdFixed = third.take(third.lastIndexOf("."))
Row.fromSeq(Seq(firstFixed, secondfix, thirdFixed))
})
val Frist_Dataframe = session.createDataFrame(Row_Dstream_Train, customSchema).toDF("time_stamp_0", "sender_ip_1", "receiver_ip_2")
val columns1and2 = Window.partitionBy("sender_ip_1", "receiver_ip_2") // <-- matches groupBy
///I add count to the dataframe
val Dataframe = Frist_Dataframe.withColumn("count", count($"receiver_ip_2") over columns1and2)
Dataframe.show()
这是输出:
+------------+-----------+-------------+-----+
|time_stamp_0|sender_ip_1|receiver_ip_2|count|
+------------+-----------+-------------+-----+
| 05:49:56| 10.0.0.2| 10.0.0.3| 19|
| 05:49:56| 10.0.0.2| 10.0.0.3| 19|
| 05:49:56| 10.0.0.2| 10.0.0.3| 19|
| 05:49:56| 10.0.0.2| 10.0.0.3| 19|
| 05:49:56| 10.0.0.2| 10.0.0.3| 19|
| 05:49:56| 10.0.0.2| 10.0.0.3| 19|
| 05:49:56| 10.0.0.2| 10.0.0.3| 19|
| 05:49:56| 10.0.0.2| 10.0.0.3| 19|
| 05:49:56| 10.0.0.2| 10.0.0.3| 19|
| 05:49:56| 10.0.0.2| 10.0.0.3| 19|
| 05:49:56| 10.0.0.2| 10.0.0.3| 19|
| 05:49:56| 10.0.0.2| 10.0.0.3| 19|
| 05:49:56| 10.0.0.2| 10.0.0.3| 19|
| 05:49:56| 10.0.0.2| 10.0.0.3| 19|
| 05:49:56| 10.0.0.2| 10.0.0.3| 19|
| 05:49:56| 10.0.0.2| 10.0.0.3| 19|
| 05:49:56| 10.0.0.2| 10.0.0.3| 19|
| 05:49:56| 10.0.0.2| 10.0.0.3| 19|
| 05:49:56| 10.0.0.2| 10.0.0.3| 19|
| 05:49:56| 10.0.0.3| 10.0.0.2| 10|
+------------+-----------+-------------+-----+
我想预测两个 IP 之间的连接数。我向数据框添加了计数。我也尝试制作标签和特征来开始预测。我还需要为训练和测试部分泄露数据。我使用了以下代码:
val toVec4 = udf[Vector, Int, Int, String, String] { (a,b,c,d) =>
val e3 = c match {
case "10.0.0.1" => 1
case "10.0.0.2" => 2
case "10.0.0.3" => 3
}
val e4 = d match {
case "10.0.0.1" => 1
case "10.0.0.2" => 2
case "10.0.0.3" => 3
}
Vectors.dense(a, b, e3, e4)
}
//val encodeLabel = udf[Double, String]( _ match { case "A" => 0.0 case "B" => 1.0} )
val final_df = Dataframe.withColumn(
"features",
toVec4(
Dataframe("time_stamp_0"),
Dataframe("count"),
Dataframe("sender_ip_1"),
Dataframe("receiver_ip_2")
)
).withColumn("label", (Dataframe("count"))).select("features", "label")
final_df.show()
val trainingTest = final_df.randomSplit(Array(0.3, 0.7))
val TrainingDF = trainingTest(0)
val TestingDF=trainingTest(1)
//TrainingDF.show()
//TestingDF.show()
但是问题是功能变为空!
+--------+-----+
|features|label|
+--------+-----+
| null| 19|
| null| 19|
| null| 19|
| null| 19|
| null| 19|
| null| 19|
| null| 19|
| null| 19|
| null| 19|
| null| 19|
| null| 19|
| null| 19|
| null| 19|
| null| 19|
| null| 19|
| null| 19|
| null| 19|
| null| 19|
| null| 19|
| null| 10|
+--------+-----+
谁能帮我解决这个问题。提前致谢。
这里的问题是您的 UDF 期望四个输入列的类型为 Int, Int, String, String
,而您将 String
作为第一列 (time_stamp_0
) 传递。
您可以通过调整 UDF 或将字段转换为 Int
:
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
val final_df = df.withColumn(
"features",
toVec4(
// casting into Timestamp to parse the string, and then into Int
$"time_stamp_0".cast(TimestampType).cast(IntegerType),
$"count",
$"sender_ip_1",
$"receiver_ip_2"
)
)
我必须说我希望得到一个适当的例外而不是 null
结果,但显然这是当前的行为。