Pyspark and error "TypeError: must be real number, not Column", when attempting to find compass bearing using a defined function over a window
Pyspark and error "TypeError: must be real number, not Column", when attempting to find compass bearing using a defined function over a window
我正在尝试应用一个函数,该函数将在 pyspark 中的 window 上找到两点之间的罗盘方位(使用 Databricks 的社区版)。我是 spark 的新手,我确定我遗漏了一些东西。
我有一个看起来像这样的测试数据框(从类似的问题中借用它并添加了一个名为“bin”的列):
Timestamp,User,Latitude,Longitude,bin,BID
1462838468,49B4361512443A4DA,39.777982,-7.054599,A1,49B4361512443A4DA A1
1462838512,49B4361512443A4DA,39.777982,-7.054599,A1,49B4361512443A4DA A1
1462838389,49B4361512443A4DA,39.777982,-7.054599,A1,49B4361512443A4DA A1
1462838497,49B4361512443A4DA,39.777982,-7.054599,A1,49B4361512443A4DA A1
1465975885,6E9E0581E2A032FD8,37.118362,-8.205041,A1,6E9E0581E2A032FD8 A1
1457723815,405C238E25FE0B9E7,37.177322,-7.426781,A1,405C238E25FE0B9E7 A1
1457897289,405C238E25FE0B9E7,37.177922,-7.447443,A1,405C238E25FE0B9E7 A1
1457899229,405C238E25FE0B9E7,37.177922,-7.447443,A1,405C238E25FE0B9E7 A1
1457972626,405C238E25FE0B9E7,37.18059,-7.46128,A1,405C238E25FE0B9E7 A1
1458062553,405C238E25FE0B9E7,37.177322,-7.426781,A1,405C238E25FE0B9E7 A1
1458241825,405C238E25FE0B9E7,37.178172,-7.444512,A1,405C238E25FE0B9E7 A1
1458244457,405C238E25FE0B9E7,37.178172,-7.444512,A1,405C238E25FE0B9E7 A1
1458412513,405C238E25FE0B9E7,37.177322,-7.426781,A1,405C238E25FE0B9E7 A1
1458412292,405C238E25FE0B9E7,37.177322,-7.426781,A1,405C238E25FE0B9E7 A1
1465197963,6E9E0581E2A032FD8,37.118362,-8.205041,B2,6E9E0581E2A032FD8 B2
1465202192,6E9E0581E2A032FD8,37.118362,-8.205041,B2,6E9E0581E2A032FD8 B2
1465923817,6E9E0581E2A032FD8,37.118362,-8.205041,B2,6E9E0581E2A032FD8 B2
1465923766,6E9E0581E2A032FD8,37.118362,-8.205041,B2,6E9E0581E2A032FD8 B2
1465923748,6E9E0581E2A032FD8,37.118362,-8.205041,B2,6E9E0581E2A032FD8 B2
1465923922,6E9E0581E2A032FD8,37.118362,-8.205041,B2,6E9E0581E2A032FD8 B2
这些是图书馆:
from pyspark.sql.functions import acos, cos, sin, lit, toRadians
from pyspark.sql import functions as F
from pyspark.sql.window import Window
import math
from pyspark.sql.functions import concat, col, lit, lag, udf
from pyspark.sql.types import LongType
这是函数:
def direction_lookup(destination_x, origin_x, destination_y, origin_y):
deltaX = destination_x - origin_x
deltaY = destination_y - origin_y
degrees_temp = math.atan2(deltaX, deltaY)/math.pi*180
if degrees_temp < 0:
degrees_final = 360 + degrees_temp
else:
degrees_final = degrees_temp
return degrees_final`
我使用“User”和“bin”(我命名为“BID”)的串联列定义 window,并按时间戳排序:
w = Window().partitionBy("BID").orderBy("Timestamp")
然后我尝试像这样计算每个时间戳之间的方位角:
bearing_df = df2.withColumn("bearing", bearing("Longitude", lag("Longitude", 1).over(w), "Latitude", lag("Latitude", 1).over(w)))
如果我只使用坐标,这个函数就可以工作,所以我在 spark 中应用它的方式有问题。我还成功地使用了相同的工作流程来计算点之间的距离(显然,具有不同的功能)。我尝试将其注册为 UDF,但出现了同样的错误,“TypeError:必须是实数,而不是列”。我不确定接下来要尝试什么,而且我对 pyspark 还很陌生。请帮忙!
DataFrame 中的列属于 Column 类型。当您传递值函数时,函数需要数字类型而不是列数据类型。
您可以在传递给函数之前转换值
from pyspark.sql.types import IntegerType
data_df = data_df.withColumn("samplecol", data_df["samplecol"].cast(IntegerType()))
从 csv 文件读取示例数据后,我使用 inferSchema
去除强制转换:
df = spark.read.option("header", True).option("inferSchema", True).csv(...)
df
现在有架构
root
|-- Timestamp: integer (nullable = true)
|-- User: string (nullable = true)
|-- Latitude: double (nullable = true)
|-- Longitude: double (nullable = true)
|-- bin: string (nullable = true)
|-- BID: string (nullable = true)
接下来可以用
定义udf
from pyspark.sql.functions import udf
from pyspark.sql.types import DoubleType
bearing = udf(direction_lookup, DoubleType())
终于可以用题中的那行调用udf了
两条评论仅与问题间接相关:
- 函数
direction_lookup
应该在开头进行空检查。组第一行的lag
函数returnsNone
否则计算将失败
def direction_lookup(destination_x, origin_x, destination_y, origin_y):
if origin_x is None or origin_y is None:
return None
deltaX = destination_x - origin_x
[...]
- window 规范也采用两行来定义分区 (
w = Window().partitionBy("User","bin").orderBy("Timestamp")
),因此 BID
列不是严格要求的
我正在尝试应用一个函数,该函数将在 pyspark 中的 window 上找到两点之间的罗盘方位(使用 Databricks 的社区版)。我是 spark 的新手,我确定我遗漏了一些东西。
我有一个看起来像这样的测试数据框(从类似的问题中借用它并添加了一个名为“bin”的列):
Timestamp,User,Latitude,Longitude,bin,BID
1462838468,49B4361512443A4DA,39.777982,-7.054599,A1,49B4361512443A4DA A1
1462838512,49B4361512443A4DA,39.777982,-7.054599,A1,49B4361512443A4DA A1
1462838389,49B4361512443A4DA,39.777982,-7.054599,A1,49B4361512443A4DA A1
1462838497,49B4361512443A4DA,39.777982,-7.054599,A1,49B4361512443A4DA A1
1465975885,6E9E0581E2A032FD8,37.118362,-8.205041,A1,6E9E0581E2A032FD8 A1
1457723815,405C238E25FE0B9E7,37.177322,-7.426781,A1,405C238E25FE0B9E7 A1
1457897289,405C238E25FE0B9E7,37.177922,-7.447443,A1,405C238E25FE0B9E7 A1
1457899229,405C238E25FE0B9E7,37.177922,-7.447443,A1,405C238E25FE0B9E7 A1
1457972626,405C238E25FE0B9E7,37.18059,-7.46128,A1,405C238E25FE0B9E7 A1
1458062553,405C238E25FE0B9E7,37.177322,-7.426781,A1,405C238E25FE0B9E7 A1
1458241825,405C238E25FE0B9E7,37.178172,-7.444512,A1,405C238E25FE0B9E7 A1
1458244457,405C238E25FE0B9E7,37.178172,-7.444512,A1,405C238E25FE0B9E7 A1
1458412513,405C238E25FE0B9E7,37.177322,-7.426781,A1,405C238E25FE0B9E7 A1
1458412292,405C238E25FE0B9E7,37.177322,-7.426781,A1,405C238E25FE0B9E7 A1
1465197963,6E9E0581E2A032FD8,37.118362,-8.205041,B2,6E9E0581E2A032FD8 B2
1465202192,6E9E0581E2A032FD8,37.118362,-8.205041,B2,6E9E0581E2A032FD8 B2
1465923817,6E9E0581E2A032FD8,37.118362,-8.205041,B2,6E9E0581E2A032FD8 B2
1465923766,6E9E0581E2A032FD8,37.118362,-8.205041,B2,6E9E0581E2A032FD8 B2
1465923748,6E9E0581E2A032FD8,37.118362,-8.205041,B2,6E9E0581E2A032FD8 B2
1465923922,6E9E0581E2A032FD8,37.118362,-8.205041,B2,6E9E0581E2A032FD8 B2
这些是图书馆:
from pyspark.sql.functions import acos, cos, sin, lit, toRadians
from pyspark.sql import functions as F
from pyspark.sql.window import Window
import math
from pyspark.sql.functions import concat, col, lit, lag, udf
from pyspark.sql.types import LongType
这是函数:
def direction_lookup(destination_x, origin_x, destination_y, origin_y):
deltaX = destination_x - origin_x
deltaY = destination_y - origin_y
degrees_temp = math.atan2(deltaX, deltaY)/math.pi*180
if degrees_temp < 0:
degrees_final = 360 + degrees_temp
else:
degrees_final = degrees_temp
return degrees_final`
我使用“User”和“bin”(我命名为“BID”)的串联列定义 window,并按时间戳排序:
w = Window().partitionBy("BID").orderBy("Timestamp")
然后我尝试像这样计算每个时间戳之间的方位角:
bearing_df = df2.withColumn("bearing", bearing("Longitude", lag("Longitude", 1).over(w), "Latitude", lag("Latitude", 1).over(w)))
如果我只使用坐标,这个函数就可以工作,所以我在 spark 中应用它的方式有问题。我还成功地使用了相同的工作流程来计算点之间的距离(显然,具有不同的功能)。我尝试将其注册为 UDF,但出现了同样的错误,“TypeError:必须是实数,而不是列”。我不确定接下来要尝试什么,而且我对 pyspark 还很陌生。请帮忙!
DataFrame 中的列属于 Column 类型。当您传递值函数时,函数需要数字类型而不是列数据类型。
您可以在传递给函数之前转换值
from pyspark.sql.types import IntegerType
data_df = data_df.withColumn("samplecol", data_df["samplecol"].cast(IntegerType()))
从 csv 文件读取示例数据后,我使用 inferSchema
去除强制转换:
df = spark.read.option("header", True).option("inferSchema", True).csv(...)
df
现在有架构
root
|-- Timestamp: integer (nullable = true)
|-- User: string (nullable = true)
|-- Latitude: double (nullable = true)
|-- Longitude: double (nullable = true)
|-- bin: string (nullable = true)
|-- BID: string (nullable = true)
接下来可以用
定义udffrom pyspark.sql.functions import udf
from pyspark.sql.types import DoubleType
bearing = udf(direction_lookup, DoubleType())
终于可以用题中的那行调用udf了
两条评论仅与问题间接相关:
- 函数
direction_lookup
应该在开头进行空检查。组第一行的lag
函数returnsNone
否则计算将失败
def direction_lookup(destination_x, origin_x, destination_y, origin_y):
if origin_x is None or origin_y is None:
return None
deltaX = destination_x - origin_x
[...]
- window 规范也采用两行来定义分区 (
w = Window().partitionBy("User","bin").orderBy("Timestamp")
),因此BID
列不是严格要求的