pyspark,比较数据框中的两行
pyspark, Compare two rows in dataframe
我正在尝试将数据框中的一行与下一行进行比较,以查看时间戳的差异。目前数据如下:
itemid | eventid | timestamp
----------------------------
134 | 30 | 2016-07-02 12:01:40
134 | 32 | 2016-07-02 12:21:23
125 | 30 | 2016-07-02 13:22:56
125 | 32 | 2016-07-02 13:27:07
我已经尝试将一个函数映射到数据框上以允许像这样进行比较:(注意:我正在尝试获取差异大于 4 小时的行)
items = df.limit(10)\
.orderBy('itemid', desc('stamp'))\
.map(lambda x,y: (x.stamp - y.stamp) > 14400).collect()
但我收到以下错误:
Py4JJavaError: An error occurred while calling
z:org.apache.spark.api.python.PythonRDD.collectAndServe
我认为这是因为我错误地使用了地图功能。帮助使用地图,或者其他解决方案将不胜感激。
更新:
@zero323 的回答是关于我对映射的不当使用的信息,但是我使用的系统是 运行 2.02 之前的 Spark 版本并且我正在使用 Cassandra 中的数据。
我设法用 mapPartitions 解决了这个问题。请参阅下面我的回答。
更新(2017/03/27):
自从最初在此 post 上标记答案以来,我对 Spark 的理解有了显着提高。我在下面更新了我的答案以显示我当前的解决方案。
是的,您以错误的方式使用了 map
函数。 map
当时对单个元素进行操作。您可以尝试使用这样的 window 函数:
from pyspark.sql.functions import col, lag
from pyspark.sql.window import Window
df = (
sc.parallelize([
(134, 30, "2016-07-02 12:01:40"), (134, 32, "2016-07-02 12:21:23"),
(125, 30, "2016-07-02 13:22:56"), (125, 32, "2016-07-02 13:27:07"),
]).toDF(["itemid", "eventid", "timestamp"])
.withColumn("timestamp", col("timestamp").cast("timestamp"))
)
w = Window.partitionBy("itemid").orderBy("timestamp")
diff = col("timestamp").cast("long") - lag("timestamp", 1).over(w).cast("long")
df.withColumn("diff", diff)
@ShuaiYuan 对原答案的评论是正确的。在过去的一年里,我对 Spark 的工作原理有了更深入的了解,并且实际上重写了我为此 post.
编写的程序
新答案 (2017/03/27)
为了比较数据框的两行,我最终使用了 RDD。我按键(在本例中为项目 ID)对数据进行分组并忽略 eventid,因为它与此等式无关。然后我将一个 lambda 函数映射到行上,returning 一个键的元组和一个包含事件间隔开始和结束的元组列表,它派生自迭代列表的 "findGaps" 函数链接到每个键的值(排序的时间戳)。完成后,我过滤掉没有时间间隔的键,然后将 flatMapValues return 数据转换为更 sql 的格式。这是通过以下代码完成的:
# Find time gaps in list of datetimes where firings are longer than given duration.
def findGaps(dates, duration):
result = []
length = len(dates)
# convert to dates for comparison
first = toDate(dates[0])
last = toDate(dates[length - 1])
for index, item in enumerate(dates):
if index < length -1 and (dates[index + 1] - item).total_seconds() > duration:
# build outage tuple and append to list
# format (start, stop, duration)
result.append(formatResult(item, dates[index + 1], kind))
return result
outage_list = outage_join_df.rdd\
.groupByKey()\
.map(lambda row: (
row[0],
findGaps(
sorted(list(row[1])),
limit
)
)
)\
.filter(lambda row: len(row[1]) > 0)\
.flatMapValues(lambda row: row)\
.map(lambda row: (
row[0]['itemid'], # itemid
row[1][0].date(), # date
row[1][0], # start
row[1][1], # stop
row[1][2] # duration
))\
.collect()
原始答案(错误)
我设法使用 mapPartitions 解决了它:
def findOutage(items):
outages = []
lastStamp = None
for item in items:
if lastStamp and (lastStamp - item.stamp).total_seconds() > 14400:
outages.append({"item": item.itemid,
"start": item.stamp.isoformat(),
"stop": lastStamp.isoformat()})
lastStamp = item.stamp
return iter(outages)
items = df.limit(10).orderBy('itemid', desc('stamp'))
outages = items.mapPartitions(findOutage).collect()
感谢大家的帮助!
我正在尝试将数据框中的一行与下一行进行比较,以查看时间戳的差异。目前数据如下:
itemid | eventid | timestamp
----------------------------
134 | 30 | 2016-07-02 12:01:40
134 | 32 | 2016-07-02 12:21:23
125 | 30 | 2016-07-02 13:22:56
125 | 32 | 2016-07-02 13:27:07
我已经尝试将一个函数映射到数据框上以允许像这样进行比较:(注意:我正在尝试获取差异大于 4 小时的行)
items = df.limit(10)\
.orderBy('itemid', desc('stamp'))\
.map(lambda x,y: (x.stamp - y.stamp) > 14400).collect()
但我收到以下错误:
Py4JJavaError: An error occurred while calling
z:org.apache.spark.api.python.PythonRDD.collectAndServe
我认为这是因为我错误地使用了地图功能。帮助使用地图,或者其他解决方案将不胜感激。
更新: @zero323 的回答是关于我对映射的不当使用的信息,但是我使用的系统是 运行 2.02 之前的 Spark 版本并且我正在使用 Cassandra 中的数据。
我设法用 mapPartitions 解决了这个问题。请参阅下面我的回答。
更新(2017/03/27): 自从最初在此 post 上标记答案以来,我对 Spark 的理解有了显着提高。我在下面更新了我的答案以显示我当前的解决方案。
是的,您以错误的方式使用了 map
函数。 map
当时对单个元素进行操作。您可以尝试使用这样的 window 函数:
from pyspark.sql.functions import col, lag
from pyspark.sql.window import Window
df = (
sc.parallelize([
(134, 30, "2016-07-02 12:01:40"), (134, 32, "2016-07-02 12:21:23"),
(125, 30, "2016-07-02 13:22:56"), (125, 32, "2016-07-02 13:27:07"),
]).toDF(["itemid", "eventid", "timestamp"])
.withColumn("timestamp", col("timestamp").cast("timestamp"))
)
w = Window.partitionBy("itemid").orderBy("timestamp")
diff = col("timestamp").cast("long") - lag("timestamp", 1).over(w).cast("long")
df.withColumn("diff", diff)
@ShuaiYuan 对原答案的评论是正确的。在过去的一年里,我对 Spark 的工作原理有了更深入的了解,并且实际上重写了我为此 post.
编写的程序新答案 (2017/03/27)
为了比较数据框的两行,我最终使用了 RDD。我按键(在本例中为项目 ID)对数据进行分组并忽略 eventid,因为它与此等式无关。然后我将一个 lambda 函数映射到行上,returning 一个键的元组和一个包含事件间隔开始和结束的元组列表,它派生自迭代列表的 "findGaps" 函数链接到每个键的值(排序的时间戳)。完成后,我过滤掉没有时间间隔的键,然后将 flatMapValues return 数据转换为更 sql 的格式。这是通过以下代码完成的:
# Find time gaps in list of datetimes where firings are longer than given duration.
def findGaps(dates, duration):
result = []
length = len(dates)
# convert to dates for comparison
first = toDate(dates[0])
last = toDate(dates[length - 1])
for index, item in enumerate(dates):
if index < length -1 and (dates[index + 1] - item).total_seconds() > duration:
# build outage tuple and append to list
# format (start, stop, duration)
result.append(formatResult(item, dates[index + 1], kind))
return result
outage_list = outage_join_df.rdd\
.groupByKey()\
.map(lambda row: (
row[0],
findGaps(
sorted(list(row[1])),
limit
)
)
)\
.filter(lambda row: len(row[1]) > 0)\
.flatMapValues(lambda row: row)\
.map(lambda row: (
row[0]['itemid'], # itemid
row[1][0].date(), # date
row[1][0], # start
row[1][1], # stop
row[1][2] # duration
))\
.collect()
原始答案(错误)
我设法使用 mapPartitions 解决了它:
def findOutage(items):
outages = []
lastStamp = None
for item in items:
if lastStamp and (lastStamp - item.stamp).total_seconds() > 14400:
outages.append({"item": item.itemid,
"start": item.stamp.isoformat(),
"stop": lastStamp.isoformat()})
lastStamp = item.stamp
return iter(outages)
items = df.limit(10).orderBy('itemid', desc('stamp'))
outages = items.mapPartitions(findOutage).collect()
感谢大家的帮助!