如何将 DataFrame 转换为 RDD[Point] 而不是 RDD[ROW]?
How to transform a DataFrame to and RDD[Point] instead of RDD[ROW]?
我有一个包含许多列的数据框,它是根据定义模式的 csv 文件创建的。我唯一感兴趣的专栏是名为 "Point" 的专栏,我在其中定义了一个麦哲伦点 (long, lat)。
我现在需要做的是从该数据帧创建一个 RDD[Point]。
下面是我试过的代码,但它不起作用,因为 rdd
是 RDD[Row] 而不是 RDD[Point]。
val schema = StructType(Array(
StructField("vendorId", StringType, false),
StructField("lpep_pickup_datetime", StringType, false),
StructField("Lpep_dropoff_datetime", StringType, false),
StructField("Store_and_fwd_flag",StringType, false),
StructField("RateCodeID", IntegerType, false),
StructField("Pickup_longitude", DoubleType, false),
StructField("Pickup_latitude", DoubleType, false),
StructField("Dropoff_longitude", DoubleType, false),
StructField("Dropoff_latitude", DoubleType, false),
StructField("Passenger_count", IntegerType, false),
StructField("Trip_distance", DoubleType, false),
StructField("Fare_amount", StringType, false),
StructField("Extra", StringType, false),
StructField("MTA_tax", StringType, false),
StructField("Tip_amount", StringType, false),
StructField("Tolls_amount", StringType, false),
StructField("Ehail_fee", StringType, false),
StructField("improvement_surcharge", StringType, false),
StructField("Total_amount", DoubleType, false),
StructField("Payment_type", IntegerType, false),
StructField("Trip_type", IntegerType, false)))
import spark.implicits._
val points = spark.read.option("mode", "DROPMALFORMED")
.schema(schema)
.csv("/home/riccardo/Scrivania/Progetto/Materiale/NYC-taxi/")
.withColumn("point", point($"Pickup_longitude",$"Pickup_latitude"))
.limit(2000)
val rdd = points.select("point").rdd
如何从数据框中获取 RDD[Point] 而不是 RDD[Row]?
如果不可能,您会建议哪种解决方案?我需要一个 RDD[Point] 来使用提供的以 RDD[Point] 作为输入的库。
如果我理解正确,您希望结果是自定义 class 类型,即 Point
而不是 Row
类型
这是我试过的:
我的输入数据样本是:
latitude,longitude
44.968046,-94.420307
44.968046,-94.420307
44.33328,-89.132008
33.755787,-116.359998
33.844843,-116.54911
44.92057,-93.44786
44.240309,-91.493619
44.968041,-94.419696
44.333304,-89.132027
我已经用 toString()
创建了我的自定义 class
case class Pair(latitude: Double, longitude: Double) {
override def toString: String = s"Pair($latitude, $longitude)"
}
现在我使用 spark 作为 DataFrame
读取输入文件并将其转换为 RDD
val df = sparkSession.read.option("inferSchema", "true")
.option("header", "true")
.csv("/home/prasadkhode/sample_input.csv")
df.printSchema()
df.show()
val rdd = df.rdd.map(row => {
Pair(row.getAs[Double]("latitude"), row.getAs[Double]("longitude"))
})
println(s"df count : ${df.count}")
println(s"rdd count : ${rdd.count}")
rdd.take(20).foreach(println)
最终结果如下:
root
|-- latitude: double (nullable = true)
|-- longitude: double (nullable = true)
+---------+-----------+
| latitude| longitude|
+---------+-----------+
|44.968046| -94.420307|
|44.968046| -94.420307|
| 44.33328| -89.132008|
|33.755787|-116.359998|
|33.844843| -116.54911|
| 44.92057| -93.44786|
|44.240309| -91.493619|
|44.968041| -94.419696|
|44.333304| -89.132027|
+---------+-----------+
df count : 9
rdd count : 9
Pair(44.968046, -94.420307)
Pair(44.968046, -94.420307)
Pair(44.33328, -89.132008)
Pair(33.755787, -116.359998)
Pair(33.844843, -116.54911)
Pair(44.92057, -93.44786)
Pair(44.240309, -91.493619)
Pair(44.968041, -94.419696)
Pair(44.333304, -89.132027)
希望这对您有所帮助...:-)
方法"as"和"rdd"可以提供帮助:
case class Point(latitude: Double, longitude: Double)
val df = Seq((1.0, 2.0)).toDF("Pickup_longitude", "Pickup_latitude")
val rdd = df
.select(
$"Pickup_longitude".alias("latitude"),
$"Pickup_latitude".alias("longitude"))
.as[Point].rdd
rdd.foreach(println)
输出:
Point(1.0,2.0)
我有一个包含许多列的数据框,它是根据定义模式的 csv 文件创建的。我唯一感兴趣的专栏是名为 "Point" 的专栏,我在其中定义了一个麦哲伦点 (long, lat)。 我现在需要做的是从该数据帧创建一个 RDD[Point]。
下面是我试过的代码,但它不起作用,因为 rdd
是 RDD[Row] 而不是 RDD[Point]。
val schema = StructType(Array(
StructField("vendorId", StringType, false),
StructField("lpep_pickup_datetime", StringType, false),
StructField("Lpep_dropoff_datetime", StringType, false),
StructField("Store_and_fwd_flag",StringType, false),
StructField("RateCodeID", IntegerType, false),
StructField("Pickup_longitude", DoubleType, false),
StructField("Pickup_latitude", DoubleType, false),
StructField("Dropoff_longitude", DoubleType, false),
StructField("Dropoff_latitude", DoubleType, false),
StructField("Passenger_count", IntegerType, false),
StructField("Trip_distance", DoubleType, false),
StructField("Fare_amount", StringType, false),
StructField("Extra", StringType, false),
StructField("MTA_tax", StringType, false),
StructField("Tip_amount", StringType, false),
StructField("Tolls_amount", StringType, false),
StructField("Ehail_fee", StringType, false),
StructField("improvement_surcharge", StringType, false),
StructField("Total_amount", DoubleType, false),
StructField("Payment_type", IntegerType, false),
StructField("Trip_type", IntegerType, false)))
import spark.implicits._
val points = spark.read.option("mode", "DROPMALFORMED")
.schema(schema)
.csv("/home/riccardo/Scrivania/Progetto/Materiale/NYC-taxi/")
.withColumn("point", point($"Pickup_longitude",$"Pickup_latitude"))
.limit(2000)
val rdd = points.select("point").rdd
如何从数据框中获取 RDD[Point] 而不是 RDD[Row]? 如果不可能,您会建议哪种解决方案?我需要一个 RDD[Point] 来使用提供的以 RDD[Point] 作为输入的库。
如果我理解正确,您希望结果是自定义 class 类型,即 Point
而不是 Row
类型
这是我试过的:
我的输入数据样本是:
latitude,longitude
44.968046,-94.420307
44.968046,-94.420307
44.33328,-89.132008
33.755787,-116.359998
33.844843,-116.54911
44.92057,-93.44786
44.240309,-91.493619
44.968041,-94.419696
44.333304,-89.132027
我已经用 toString()
case class Pair(latitude: Double, longitude: Double) {
override def toString: String = s"Pair($latitude, $longitude)"
}
现在我使用 spark 作为 DataFrame
读取输入文件并将其转换为 RDD
val df = sparkSession.read.option("inferSchema", "true")
.option("header", "true")
.csv("/home/prasadkhode/sample_input.csv")
df.printSchema()
df.show()
val rdd = df.rdd.map(row => {
Pair(row.getAs[Double]("latitude"), row.getAs[Double]("longitude"))
})
println(s"df count : ${df.count}")
println(s"rdd count : ${rdd.count}")
rdd.take(20).foreach(println)
最终结果如下:
root
|-- latitude: double (nullable = true)
|-- longitude: double (nullable = true)
+---------+-----------+
| latitude| longitude|
+---------+-----------+
|44.968046| -94.420307|
|44.968046| -94.420307|
| 44.33328| -89.132008|
|33.755787|-116.359998|
|33.844843| -116.54911|
| 44.92057| -93.44786|
|44.240309| -91.493619|
|44.968041| -94.419696|
|44.333304| -89.132027|
+---------+-----------+
df count : 9
rdd count : 9
Pair(44.968046, -94.420307)
Pair(44.968046, -94.420307)
Pair(44.33328, -89.132008)
Pair(33.755787, -116.359998)
Pair(33.844843, -116.54911)
Pair(44.92057, -93.44786)
Pair(44.240309, -91.493619)
Pair(44.968041, -94.419696)
Pair(44.333304, -89.132027)
希望这对您有所帮助...:-)
方法"as"和"rdd"可以提供帮助:
case class Point(latitude: Double, longitude: Double)
val df = Seq((1.0, 2.0)).toDF("Pickup_longitude", "Pickup_latitude")
val rdd = df
.select(
$"Pickup_longitude".alias("latitude"),
$"Pickup_latitude".alias("longitude"))
.as[Point].rdd
rdd.foreach(println)
输出:
Point(1.0,2.0)