Pyspark and error "TypeError: must be real number, not Column", when attempting to find compass bearing using a defined function over a window

Pyspark and error "TypeError: must be real number, not Column", when attempting to find compass bearing using a defined function over a window

我正在尝试应用一个函数,该函数将在 pyspark 中的 window 上找到两点之间的罗盘方位(使用 Databricks 的社区版)。我是 spark 的新手,我确定我遗漏了一些东西。

我有一个看起来像这样的测试数据框(从类似的问题中借用它并添加了一个名为“bin”的列):

Timestamp,User,Latitude,Longitude,bin,BID
1462838468,49B4361512443A4DA,39.777982,-7.054599,A1,49B4361512443A4DA A1
1462838512,49B4361512443A4DA,39.777982,-7.054599,A1,49B4361512443A4DA A1
1462838389,49B4361512443A4DA,39.777982,-7.054599,A1,49B4361512443A4DA A1
1462838497,49B4361512443A4DA,39.777982,-7.054599,A1,49B4361512443A4DA A1
1465975885,6E9E0581E2A032FD8,37.118362,-8.205041,A1,6E9E0581E2A032FD8 A1
1457723815,405C238E25FE0B9E7,37.177322,-7.426781,A1,405C238E25FE0B9E7 A1
1457897289,405C238E25FE0B9E7,37.177922,-7.447443,A1,405C238E25FE0B9E7 A1
1457899229,405C238E25FE0B9E7,37.177922,-7.447443,A1,405C238E25FE0B9E7 A1
1457972626,405C238E25FE0B9E7,37.18059,-7.46128,A1,405C238E25FE0B9E7 A1
1458062553,405C238E25FE0B9E7,37.177322,-7.426781,A1,405C238E25FE0B9E7 A1
1458241825,405C238E25FE0B9E7,37.178172,-7.444512,A1,405C238E25FE0B9E7 A1
1458244457,405C238E25FE0B9E7,37.178172,-7.444512,A1,405C238E25FE0B9E7 A1
1458412513,405C238E25FE0B9E7,37.177322,-7.426781,A1,405C238E25FE0B9E7 A1
1458412292,405C238E25FE0B9E7,37.177322,-7.426781,A1,405C238E25FE0B9E7 A1
1465197963,6E9E0581E2A032FD8,37.118362,-8.205041,B2,6E9E0581E2A032FD8 B2
1465202192,6E9E0581E2A032FD8,37.118362,-8.205041,B2,6E9E0581E2A032FD8 B2
1465923817,6E9E0581E2A032FD8,37.118362,-8.205041,B2,6E9E0581E2A032FD8 B2
1465923766,6E9E0581E2A032FD8,37.118362,-8.205041,B2,6E9E0581E2A032FD8 B2
1465923748,6E9E0581E2A032FD8,37.118362,-8.205041,B2,6E9E0581E2A032FD8 B2
1465923922,6E9E0581E2A032FD8,37.118362,-8.205041,B2,6E9E0581E2A032FD8 B2

这些是图书馆:

from pyspark.sql.functions import acos, cos, sin, lit, toRadians

from pyspark.sql import functions as F

from pyspark.sql.window import Window

import math

from pyspark.sql.functions import concat, col, lit, lag, udf

from pyspark.sql.types import LongType

这是函数:

def direction_lookup(destination_x, origin_x, destination_y, origin_y):

deltaX = destination_x - origin_x

deltaY = destination_y - origin_y

degrees_temp = math.atan2(deltaX, deltaY)/math.pi*180

if degrees_temp < 0:

    degrees_final = 360 + degrees_temp

else:

    degrees_final = degrees_temp


return degrees_final`

我使用“User”和“bin”(我命名为“BID”)的串联列定义 window,并按时间戳排序:

w = Window().partitionBy("BID").orderBy("Timestamp")

然后我尝试像这样计算每个时间戳之间的方位角:

bearing_df = df2.withColumn("bearing", bearing("Longitude", lag("Longitude", 1).over(w), "Latitude", lag("Latitude", 1).over(w)))

如果我只使用坐标,这个函数就可以工作,所以我在 spark 中应用它的方式有问题。我还成功地使用了相同的工作流程来计算点之间的距离(显然,具有不同的功能)。我尝试将其注册为 UDF,但出现了同样的错误,“TypeError:必须是实数,而不是列”。我不确定接下来要尝试什么,而且我对 pyspark 还很陌生。请帮忙!

DataFrame 中的列属于 Column 类型。当您传递值函数时,函数需要数字类型而不是列数据类型。

您可以在传递给函数之前转换值

from pyspark.sql.types import IntegerType
data_df = data_df.withColumn("samplecol", data_df["samplecol"].cast(IntegerType()))

从 csv 文件读取示例数据后,我使用 inferSchema 去除强制转换:

df = spark.read.option("header", True).option("inferSchema", True).csv(...)

df 现在有架构

root
 |-- Timestamp: integer (nullable = true)
 |-- User: string (nullable = true)
 |-- Latitude: double (nullable = true)
 |-- Longitude: double (nullable = true)
 |-- bin: string (nullable = true)
 |-- BID: string (nullable = true)

接下来可以用

定义udf
from pyspark.sql.functions import udf
from pyspark.sql.types import DoubleType
bearing = udf(direction_lookup, DoubleType())

终于可以用题中的那行调用udf了


两条评论仅与问题间接相关:

  • 函数 direction_lookup 应该在开头进行空检查。组第一行的lag函数returnsNone否则计算将失败
def direction_lookup(destination_x, origin_x, destination_y, origin_y):
    if origin_x is None or origin_y is None:
        return None
    deltaX = destination_x - origin_x
    [...]
  • window 规范也采用两行来定义分区 (w = Window().partitionBy("User","bin").orderBy("Timestamp")),因此 BID 列不是严格要求的