Spark:指望 window 不工作毫秒
Spark: counting on a window not working for millisecond
您可以创建一个 window 来计算记录在过去 7 天内出现的次数。但是,如果你试图在毫秒级别上查看记录发生的次数,它就会崩溃。
简而言之,下面的函数df.timestamp.astype('Timestamp').cast("long")
只将时间戳转换为一秒的时间戳。它忽略了毫秒。你如何把整个时间戳,包括毫秒,变成一个长的。您需要该值很长,以便它可以与 window 一起使用。
from pyspark.sql import Window
from pyspark.sql import functions as F
from pyspark.sql.functions import unix_timestamp
df = sqlContext.createDataFrame([
("a", "u8u", "2018-02-02 05:46:41.438357"),
("a", "u8u", "2018-02-02 05:46:41.439377"),
("a", "a3a", "2018-02-02 09:48:34.081818"),
("a", "a3a", "2018-02-02 09:48:34.095586"),
("a", "g8g", "2018-02-02 09:48:56.006206"),
("a", "g8g", "2018-02-02 09:48:56.007974"),
("a", "9k9", "2018-02-02 12:50:48.000000"),
("a", "9k9", "2018-02-02 12:50:48.100000"),
], ["person_id", "session_id", "timestamp"])
df = df.withColumn('unix_ts',df.timestamp.astype('Timestamp').cast("long"))
df = df.withColumn("DayOfWeek",F.date_format(df.timestamp, 'EEEE'))
w = Window.partitionBy('person_id','DayOfWeek').orderBy('unix_ts').rangeBetween(-86400*7,-1)
df = df.withColumn('count',F.count('unix_ts').over(w))
df.sort(df.unix_ts).show(20,False)
+---------+----------+--------------------------+----------+---------+-----+
|person_id|session_id|timestamp |unix_ts |DayOfWeek|count|
+---------+----------+--------------------------+----------+---------+-----+
|a |u8u |2018-02-02 05:46:41.438357|1517572001|Friday |0 |
|a |u8u |2018-02-02 05:46:41.439377|1517572001|Friday |0 |
|a |a3a |2018-02-02 09:48:34.081818|1517586514|Friday |2 |
|a |a3a |2018-02-02 09:48:34.095586|1517586514|Friday |2 |
|a |g8g |2018-02-02 09:48:56.006206|1517586536|Friday |4 |
|a |g8g |2018-02-02 09:48:56.007974|1517586536|Friday |4 |
|a |9k9 |2018-02-02 12:50:48.000000|1517597448|Friday |6 |
|a |9k9 |2018-02-02 12:50:48.100000|1517597448|Friday |6 |
+---------+----------+--------------------------+----------+---------+-----+
计数应该是 0,1,2,3,4,5... 而不是 0,0,2,2,4,4,...
您可以使用 pyspark.sql.functions.unix_timestamp()
将字符串列转换为时间戳,而不是转换为 long
。
import pyspark.sql.functions as F
df.select(
"timestamp",
F.unix_timestamp(F.col("timestamp"), "yyyy-MM-dd HH:mm:ss.SSSSSS").alias("unix_ts")
).show(truncate=False)
#+--------------------------+----------+
#|timestamp |unix_ts |
#+--------------------------+----------+
#|2018-02-02 05:46:41.438357|1517568839|
#|2018-02-02 05:46:41.439377|1517568840|
#|2018-02-02 09:48:34.081818|1517582995|
#|2018-02-02 09:48:34.095586|1517583009|
#|2018-02-02 09:48:56.006206|1517582942|
#|2018-02-02 09:48:56.007974|1517582943|
#|2018-02-02 12:50:48.862644|1517594710|
#|2018-02-02 12:50:49.981848|1517594830|
#+--------------------------+----------+
unix_timestamp()
的第二个参数是格式字符串。在你的情况下,使用 "yyyy-MM-dd HH:mm:ss.SSSSSS"
.
应用于您的代码的相应更改为:
df = df.withColumn(
'unix_ts',
F.unix_timestamp(F.col("timestamp"), "yyyy-MM-dd HH:mm:ss.SSSSSS")
)
df = df.withColumn("DayOfWeek", F.date_format(df.timestamp, 'EEEE'))
w = Window.partitionBy('person_id','DayOfWeek').orderBy('unix_ts').rangeBetween(-86400*7,-1)
df = df.withColumn('count',F.count('unix_ts').over(w))
df.sort(df.unix_ts).show(20,False)
#+---------+----------+--------------------------+----------+---------+-----+
#|person_id|session_id|timestamp |unix_ts |DayOfWeek|count|
#+---------+----------+--------------------------+----------+---------+-----+
#|a |u8u |2018-02-02 05:46:41.438357|1517568839|Friday |0 |
#|a |u8u |2018-02-02 05:46:41.439377|1517568840|Friday |1 |
#|a |g8g |2018-02-02 09:48:56.006206|1517582942|Friday |2 |
#|a |g8g |2018-02-02 09:48:56.007974|1517582943|Friday |3 |
#|a |a3a |2018-02-02 09:48:34.081818|1517582995|Friday |4 |
#|a |a3a |2018-02-02 09:48:34.095586|1517583009|Friday |5 |
#|a |9k9 |2018-02-02 12:50:48.862644|1517594710|Friday |6 |
#|a |9k9 |2018-02-02 12:50:49.981848|1517594830|Friday |7 |
#+---------+----------+--------------------------+----------+---------+-----+
您可以创建一个 window 来计算记录在过去 7 天内出现的次数。但是,如果你试图在毫秒级别上查看记录发生的次数,它就会崩溃。
简而言之,下面的函数df.timestamp.astype('Timestamp').cast("long")
只将时间戳转换为一秒的时间戳。它忽略了毫秒。你如何把整个时间戳,包括毫秒,变成一个长的。您需要该值很长,以便它可以与 window 一起使用。
from pyspark.sql import Window
from pyspark.sql import functions as F
from pyspark.sql.functions import unix_timestamp
df = sqlContext.createDataFrame([
("a", "u8u", "2018-02-02 05:46:41.438357"),
("a", "u8u", "2018-02-02 05:46:41.439377"),
("a", "a3a", "2018-02-02 09:48:34.081818"),
("a", "a3a", "2018-02-02 09:48:34.095586"),
("a", "g8g", "2018-02-02 09:48:56.006206"),
("a", "g8g", "2018-02-02 09:48:56.007974"),
("a", "9k9", "2018-02-02 12:50:48.000000"),
("a", "9k9", "2018-02-02 12:50:48.100000"),
], ["person_id", "session_id", "timestamp"])
df = df.withColumn('unix_ts',df.timestamp.astype('Timestamp').cast("long"))
df = df.withColumn("DayOfWeek",F.date_format(df.timestamp, 'EEEE'))
w = Window.partitionBy('person_id','DayOfWeek').orderBy('unix_ts').rangeBetween(-86400*7,-1)
df = df.withColumn('count',F.count('unix_ts').over(w))
df.sort(df.unix_ts).show(20,False)
+---------+----------+--------------------------+----------+---------+-----+
|person_id|session_id|timestamp |unix_ts |DayOfWeek|count|
+---------+----------+--------------------------+----------+---------+-----+
|a |u8u |2018-02-02 05:46:41.438357|1517572001|Friday |0 |
|a |u8u |2018-02-02 05:46:41.439377|1517572001|Friday |0 |
|a |a3a |2018-02-02 09:48:34.081818|1517586514|Friday |2 |
|a |a3a |2018-02-02 09:48:34.095586|1517586514|Friday |2 |
|a |g8g |2018-02-02 09:48:56.006206|1517586536|Friday |4 |
|a |g8g |2018-02-02 09:48:56.007974|1517586536|Friday |4 |
|a |9k9 |2018-02-02 12:50:48.000000|1517597448|Friday |6 |
|a |9k9 |2018-02-02 12:50:48.100000|1517597448|Friday |6 |
+---------+----------+--------------------------+----------+---------+-----+
计数应该是 0,1,2,3,4,5... 而不是 0,0,2,2,4,4,...
您可以使用 pyspark.sql.functions.unix_timestamp()
将字符串列转换为时间戳,而不是转换为 long
。
import pyspark.sql.functions as F
df.select(
"timestamp",
F.unix_timestamp(F.col("timestamp"), "yyyy-MM-dd HH:mm:ss.SSSSSS").alias("unix_ts")
).show(truncate=False)
#+--------------------------+----------+
#|timestamp |unix_ts |
#+--------------------------+----------+
#|2018-02-02 05:46:41.438357|1517568839|
#|2018-02-02 05:46:41.439377|1517568840|
#|2018-02-02 09:48:34.081818|1517582995|
#|2018-02-02 09:48:34.095586|1517583009|
#|2018-02-02 09:48:56.006206|1517582942|
#|2018-02-02 09:48:56.007974|1517582943|
#|2018-02-02 12:50:48.862644|1517594710|
#|2018-02-02 12:50:49.981848|1517594830|
#+--------------------------+----------+
unix_timestamp()
的第二个参数是格式字符串。在你的情况下,使用 "yyyy-MM-dd HH:mm:ss.SSSSSS"
.
应用于您的代码的相应更改为:
df = df.withColumn(
'unix_ts',
F.unix_timestamp(F.col("timestamp"), "yyyy-MM-dd HH:mm:ss.SSSSSS")
)
df = df.withColumn("DayOfWeek", F.date_format(df.timestamp, 'EEEE'))
w = Window.partitionBy('person_id','DayOfWeek').orderBy('unix_ts').rangeBetween(-86400*7,-1)
df = df.withColumn('count',F.count('unix_ts').over(w))
df.sort(df.unix_ts).show(20,False)
#+---------+----------+--------------------------+----------+---------+-----+
#|person_id|session_id|timestamp |unix_ts |DayOfWeek|count|
#+---------+----------+--------------------------+----------+---------+-----+
#|a |u8u |2018-02-02 05:46:41.438357|1517568839|Friday |0 |
#|a |u8u |2018-02-02 05:46:41.439377|1517568840|Friday |1 |
#|a |g8g |2018-02-02 09:48:56.006206|1517582942|Friday |2 |
#|a |g8g |2018-02-02 09:48:56.007974|1517582943|Friday |3 |
#|a |a3a |2018-02-02 09:48:34.081818|1517582995|Friday |4 |
#|a |a3a |2018-02-02 09:48:34.095586|1517583009|Friday |5 |
#|a |9k9 |2018-02-02 12:50:48.862644|1517594710|Friday |6 |
#|a |9k9 |2018-02-02 12:50:49.981848|1517594830|Friday |7 |
#+---------+----------+--------------------------+----------+---------+-----+