Pyspark:过滤数据帧并将函数应用于偏移时间
Pyspark: Filter dataframe and apply function to offset time
我有这样一个数据框:
import time
import datetime
import pandas as pd
df = pd.DataFrame({'Number': ['1', '2', '1', '1'],
'Letter': ['A', 'A', 'B', 'A'],
'Time': ['2019-04-30 18:15:00', '2019-04-30 18:15:00', '2019-04-30 18:15:00', '2019-04-30 18:15:00'],
'Value': [30, 30, 30, 60]})
df['Time'] = pd.to_datetime(df['Time'])
Number Letter Time Value
0 1 A 2019-04-30 18:15:00 30
1 2 A 2019-04-30 18:15:00 30
2 1 B 2019-04-30 18:15:00 30
3 1 A 2019-04-30 18:15:00 60
我想在 Pyspark 中做一些与我在 Pandas 中过滤特定数据集时类似的事情:
#: Want to target only rows where the Number = '1' and the Letter is 'A'.
target_df = df[
(df['Number'] == '1') &
(df['Letter'] == 'A')
]
并根据另一列对值应用更改:
#: Loop over these rows and subtract the offset value from the Time.
for index, row in target_df.iterrows():
offset = row['Value']
df.loc[index, 'Time'] = row['Time'] - datetime.timedelta(seconds=row['Value'])
要得到这样的最终输出:
Number Letter Time Value
0 1 A 2019-04-30 18:14:30 30
1 2 A 2019-04-30 18:15:00 30
2 1 B 2019-04-30 18:15:00 30
3 1 A 2019-04-30 18:14:00 60
在 Pyspark 中解决这个问题的最佳方法是什么?
我在想一些类似的事情:
pyspark_df = spark.createDataFrame(df)
pyspark_df.withColumn('new_time', F.when(
F.col('Number') == '1' & F.col('Letter' == 'A'), F.col('Time') - datetime.timedelta(seconds=(F.col('Value')))).otherwise(
F.col('Time')))
但这似乎对我不起作用。
您可以尝试使用 unix 时间戳:
import pyspark.sql.functions as F
cond_val = (F.when((F.col("Number")==1)&(F.col("Letter")=="A")
,F.from_unixtime(F.unix_timestamp(F.col("Time"))-F.col("Value")))
.otherwise(F.col("Time")))
df.withColumn("Time",cond_val).show()
+------+------+-------------------+-----+
|Number|Letter| Time|Value|
+------+------+-------------------+-----+
| 1| A|2019-04-30 18:14:30| 30|
| 2| A|2019-04-30 18:15:00| 30|
| 1| B|2019-04-30 18:15:00| 30|
| 1| A|2019-04-30 18:14:00| 60|
+------+------+-------------------+-----+
补充一下,pandas不需要iterrows
,只需要:
c = df['Number'].eq(1) & df['Letter'].eq('A')
df.loc[c,'Time'] = df['Time'].sub(pd.to_timedelta(df['Value'],unit='s'))
#or faster
#df['Time'] = np.where(c,df['Time'].sub(pd.to_timedelta(df['Value'],unit='s'))
#,df['Time'])
我有这样一个数据框:
import time
import datetime
import pandas as pd
df = pd.DataFrame({'Number': ['1', '2', '1', '1'],
'Letter': ['A', 'A', 'B', 'A'],
'Time': ['2019-04-30 18:15:00', '2019-04-30 18:15:00', '2019-04-30 18:15:00', '2019-04-30 18:15:00'],
'Value': [30, 30, 30, 60]})
df['Time'] = pd.to_datetime(df['Time'])
Number Letter Time Value
0 1 A 2019-04-30 18:15:00 30
1 2 A 2019-04-30 18:15:00 30
2 1 B 2019-04-30 18:15:00 30
3 1 A 2019-04-30 18:15:00 60
我想在 Pyspark 中做一些与我在 Pandas 中过滤特定数据集时类似的事情:
#: Want to target only rows where the Number = '1' and the Letter is 'A'.
target_df = df[
(df['Number'] == '1') &
(df['Letter'] == 'A')
]
并根据另一列对值应用更改:
#: Loop over these rows and subtract the offset value from the Time.
for index, row in target_df.iterrows():
offset = row['Value']
df.loc[index, 'Time'] = row['Time'] - datetime.timedelta(seconds=row['Value'])
要得到这样的最终输出:
Number Letter Time Value
0 1 A 2019-04-30 18:14:30 30
1 2 A 2019-04-30 18:15:00 30
2 1 B 2019-04-30 18:15:00 30
3 1 A 2019-04-30 18:14:00 60
在 Pyspark 中解决这个问题的最佳方法是什么? 我在想一些类似的事情:
pyspark_df = spark.createDataFrame(df)
pyspark_df.withColumn('new_time', F.when(
F.col('Number') == '1' & F.col('Letter' == 'A'), F.col('Time') - datetime.timedelta(seconds=(F.col('Value')))).otherwise(
F.col('Time')))
但这似乎对我不起作用。
您可以尝试使用 unix 时间戳:
import pyspark.sql.functions as F
cond_val = (F.when((F.col("Number")==1)&(F.col("Letter")=="A")
,F.from_unixtime(F.unix_timestamp(F.col("Time"))-F.col("Value")))
.otherwise(F.col("Time")))
df.withColumn("Time",cond_val).show()
+------+------+-------------------+-----+
|Number|Letter| Time|Value|
+------+------+-------------------+-----+
| 1| A|2019-04-30 18:14:30| 30|
| 2| A|2019-04-30 18:15:00| 30|
| 1| B|2019-04-30 18:15:00| 30|
| 1| A|2019-04-30 18:14:00| 60|
+------+------+-------------------+-----+
补充一下,pandas不需要iterrows
,只需要:
c = df['Number'].eq(1) & df['Letter'].eq('A')
df.loc[c,'Time'] = df['Time'].sub(pd.to_timedelta(df['Value'],unit='s'))
#or faster
#df['Time'] = np.where(c,df['Time'].sub(pd.to_timedelta(df['Value'],unit='s'))
#,df['Time'])