PySpark 中的日期时间范围过滤器 SQL
datetime range filter in PySpark SQL
按时间戳字段过滤数据帧的正确方法是什么?
我尝试了不同的日期格式和过滤形式,没有任何帮助:要么 pyspark returns 0 个对象,要么抛出它不理解日期时间格式的错误
这是我目前得到的:
from pyspark import SparkContext
from pyspark.sql import SQLContext
from django.utils import timezone
from django.conf import settings
from myapp.models import Collection
sc = SparkContext("local", "DjangoApp")
sqlc = SQLContext(sc)
url = "jdbc:postgresql://%(HOST)s/%(NAME)s?user=%(USER)s&password=%(PASSWORD)s" % settings.DATABASES['default']
sf = sqlc.load(source="jdbc", url=url, dbtable='myapp_collection')
时间戳字段的范围:
system_tz = timezone.pytz.timezone(settings.TIME_ZONE)
date_from = datetime.datetime(2014, 4, 16, 18, 30, 0, 0, tzinfo=system_tz)
date_to = datetime.datetime(2015, 6, 15, 18, 11, 59, 999999, tzinfo=system_tz)
尝试 1
date_filter = "my_col >= '%s' AND my_col <= '%s'" % (
date_from.isoformat(), date_to.isoformat()
)
sf = sf.filter(date_filter)
sf.count()
Out[12]: 0
尝试 2
sf = sf.filter(sf.my_col >= date_from).filter(sf.my_col <= date_to)
sf.count()
---------------------------------------------------------------------------
Py4JJavaError: An error occurred while calling o63.count.
: org.apache.spark.SparkException: Job aborted due to stage failure:
Task 0 in stage 4.0 failed 1 times, most recent failure:
Lost task 0.0 in stage 4.0 (TID 3, localhost): org.postgresql.util.PSQLException:
ERROR: syntax error at or near "18"
#
# ups.. JDBC doesn't understand 24h time format??
尝试 3
sf = sf.filter("my_col BETWEEN '%s' AND '%s'" % \
(date_from.isoformat(), date_to.isoformat())
)
---------------------------------------------------------------------------
Py4JJavaError: An error occurred while calling o97.count.
: org.apache.spark.SparkException: Job aborted due to stage failure:
Task 0 in stage 17.0 failed 1 times, most recent failure:
Lost task 0.0 in stage 17.0 (TID 13, localhost): org.postgresql.util.PSQLException:
ERROR: syntax error at or near "18"
数据确实存在于 table 中,但是:
django_filters = {
'my_col__gte': date_from,
'my_col__lte': date_to
}
Collection.objects.filter(**django_filters).count()
Out[17]: 1093436
或者这样
django_range_filter = {'my_col__range': (date_from, date_to)}
Collection.objects.filter(**django_range_filter).count()
Out[19]: 1093436
假设您的数据框如下所示:
sf = sqlContext.createDataFrame([
[datetime.datetime(2013, 6, 29, 11, 34, 29)],
[datetime.datetime(2015, 7, 14, 11, 34, 27)],
[datetime.datetime(2012, 3, 10, 19, 00, 11)],
[datetime.datetime(2016, 2, 8, 12, 21)],
[datetime.datetime(2014, 4, 4, 11, 28, 29)]
], ('my_col', ))
架构:
root
|-- my_col: timestamp (nullable = true)
并且您想查找以下范围内的日期:
import datetime, time
dates = ("2013-01-01 00:00:00", "2015-07-01 00:00:00")
timestamps = (
time.mktime(datetime.datetime.strptime(s, "%Y-%m-%d %H:%M:%S").timetuple())
for s in dates)
可以使用在驱动程序端计算的时间戳进行查询:
q1 = "CAST(my_col AS INT) BETWEEN {0} AND {1}".format(*timestamps)
sf.where(q1).show()
或使用unix_timestamp
函数:
q2 = """CAST(my_col AS INT)
BETWEEN unix_timestamp('{0}', 'yyyy-MM-dd HH:mm:ss')
AND unix_timestamp('{1}', 'yyyy-MM-dd HH:mm:ss')""".format(*dates)
sf.where(q2).show()
也可以按照我在 .
中描述的类似方式使用 udf
如果您使用原始 SQL,则可以使用 year
、date
等提取不同的时间戳元素。
sqlContext.sql("""SELECT * FROM sf
WHERE YEAR(my_col) BETWEEN 2014 AND 2015").show()
编辑:
从 Spark 1.5 开始,您可以使用内置函数:
dates = ("2013-01-01", "2015-07-01")
date_from, date_to = [to_date(lit(s)).cast(TimestampType()) for s in dates]
sf.where((sf.my_col > date_from) & (sf.my_col < date_to))
您也可以使用 pyspark.sql.Column.between
,它包含边界:
from pyspark.sql.functions import col
sf.where(col('my_col').between(*dates)).show(truncate=False)
#+---------------------+
#|my_col |
#+---------------------+
#|2013-06-29 11:34:29.0|
#|2014-04-04 11:28:29.0|
#+---------------------+
这样的事情怎么样:
import pyspark.sql.functions as func
df = df.select(func.to_date(df.my_col).alias("time"))
sf = df.filter(df.time > date_from).filter(df.time < date_to)
以下似乎对我有用(尽管有人告诉我这是错误的形式还是不准确的)...
首先,为 window 的每一端创建一个新列(在本例中,它是列中日期之后的 100 天到 200 天:column_name
。
from pyspark.sql import functions as F
new_df = new_df.withColumn('After100Days', F.lit(F.date_add(new_df['column_name'], 100)))
new_df = new_df.withColumn('After200Days', F.lit(F.date_add(new_df['column_name'], 200)))
筛选如下...
用于过滤特定范围内的日期:
result= df.where((df.col1> df.col2) & (df.col1 < df.col3))
用于过滤特定范围外的日期:
result= df.where((df.col1 < df.col2) | (df.col1 > df.col3))
按时间戳字段过滤数据帧的正确方法是什么?
我尝试了不同的日期格式和过滤形式,没有任何帮助:要么 pyspark returns 0 个对象,要么抛出它不理解日期时间格式的错误
这是我目前得到的:
from pyspark import SparkContext
from pyspark.sql import SQLContext
from django.utils import timezone
from django.conf import settings
from myapp.models import Collection
sc = SparkContext("local", "DjangoApp")
sqlc = SQLContext(sc)
url = "jdbc:postgresql://%(HOST)s/%(NAME)s?user=%(USER)s&password=%(PASSWORD)s" % settings.DATABASES['default']
sf = sqlc.load(source="jdbc", url=url, dbtable='myapp_collection')
时间戳字段的范围:
system_tz = timezone.pytz.timezone(settings.TIME_ZONE)
date_from = datetime.datetime(2014, 4, 16, 18, 30, 0, 0, tzinfo=system_tz)
date_to = datetime.datetime(2015, 6, 15, 18, 11, 59, 999999, tzinfo=system_tz)
尝试 1
date_filter = "my_col >= '%s' AND my_col <= '%s'" % (
date_from.isoformat(), date_to.isoformat()
)
sf = sf.filter(date_filter)
sf.count()
Out[12]: 0
尝试 2
sf = sf.filter(sf.my_col >= date_from).filter(sf.my_col <= date_to)
sf.count()
---------------------------------------------------------------------------
Py4JJavaError: An error occurred while calling o63.count.
: org.apache.spark.SparkException: Job aborted due to stage failure:
Task 0 in stage 4.0 failed 1 times, most recent failure:
Lost task 0.0 in stage 4.0 (TID 3, localhost): org.postgresql.util.PSQLException:
ERROR: syntax error at or near "18"
#
# ups.. JDBC doesn't understand 24h time format??
尝试 3
sf = sf.filter("my_col BETWEEN '%s' AND '%s'" % \
(date_from.isoformat(), date_to.isoformat())
)
---------------------------------------------------------------------------
Py4JJavaError: An error occurred while calling o97.count.
: org.apache.spark.SparkException: Job aborted due to stage failure:
Task 0 in stage 17.0 failed 1 times, most recent failure:
Lost task 0.0 in stage 17.0 (TID 13, localhost): org.postgresql.util.PSQLException:
ERROR: syntax error at or near "18"
数据确实存在于 table 中,但是:
django_filters = {
'my_col__gte': date_from,
'my_col__lte': date_to
}
Collection.objects.filter(**django_filters).count()
Out[17]: 1093436
或者这样
django_range_filter = {'my_col__range': (date_from, date_to)}
Collection.objects.filter(**django_range_filter).count()
Out[19]: 1093436
假设您的数据框如下所示:
sf = sqlContext.createDataFrame([
[datetime.datetime(2013, 6, 29, 11, 34, 29)],
[datetime.datetime(2015, 7, 14, 11, 34, 27)],
[datetime.datetime(2012, 3, 10, 19, 00, 11)],
[datetime.datetime(2016, 2, 8, 12, 21)],
[datetime.datetime(2014, 4, 4, 11, 28, 29)]
], ('my_col', ))
架构:
root
|-- my_col: timestamp (nullable = true)
并且您想查找以下范围内的日期:
import datetime, time
dates = ("2013-01-01 00:00:00", "2015-07-01 00:00:00")
timestamps = (
time.mktime(datetime.datetime.strptime(s, "%Y-%m-%d %H:%M:%S").timetuple())
for s in dates)
可以使用在驱动程序端计算的时间戳进行查询:
q1 = "CAST(my_col AS INT) BETWEEN {0} AND {1}".format(*timestamps)
sf.where(q1).show()
或使用unix_timestamp
函数:
q2 = """CAST(my_col AS INT)
BETWEEN unix_timestamp('{0}', 'yyyy-MM-dd HH:mm:ss')
AND unix_timestamp('{1}', 'yyyy-MM-dd HH:mm:ss')""".format(*dates)
sf.where(q2).show()
也可以按照我在
如果您使用原始 SQL,则可以使用 year
、date
等提取不同的时间戳元素。
sqlContext.sql("""SELECT * FROM sf
WHERE YEAR(my_col) BETWEEN 2014 AND 2015").show()
编辑:
从 Spark 1.5 开始,您可以使用内置函数:
dates = ("2013-01-01", "2015-07-01")
date_from, date_to = [to_date(lit(s)).cast(TimestampType()) for s in dates]
sf.where((sf.my_col > date_from) & (sf.my_col < date_to))
您也可以使用 pyspark.sql.Column.between
,它包含边界:
from pyspark.sql.functions import col
sf.where(col('my_col').between(*dates)).show(truncate=False)
#+---------------------+
#|my_col |
#+---------------------+
#|2013-06-29 11:34:29.0|
#|2014-04-04 11:28:29.0|
#+---------------------+
这样的事情怎么样:
import pyspark.sql.functions as func
df = df.select(func.to_date(df.my_col).alias("time"))
sf = df.filter(df.time > date_from).filter(df.time < date_to)
以下似乎对我有用(尽管有人告诉我这是错误的形式还是不准确的)...
首先,为 window 的每一端创建一个新列(在本例中,它是列中日期之后的 100 天到 200 天:column_name
。
from pyspark.sql import functions as F
new_df = new_df.withColumn('After100Days', F.lit(F.date_add(new_df['column_name'], 100)))
new_df = new_df.withColumn('After200Days', F.lit(F.date_add(new_df['column_name'], 200)))
筛选如下...
用于过滤特定范围内的日期:
result= df.where((df.col1> df.col2) & (df.col1 < df.col3))
用于过滤特定范围外的日期:
result= df.where((df.col1 < df.col2) | (df.col1 > df.col3))